about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/pipeline/transport')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py120
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py571
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py863
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py171
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py55
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py48
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py296
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py421
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py311
9 files changed, 2856 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py
new file mode 100644
index 00000000..f60e260a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py
@@ -0,0 +1,120 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from typing import List, Optional, Any
+from ._base import HttpTransport, HttpRequest, HttpResponse
+from ._base_async import AsyncHttpTransport, AsyncHttpResponse
+
+# pylint: disable=undefined-all-variable
+
+__all__ = [
+    "HttpTransport",
+    "HttpRequest",
+    "HttpResponse",
+    "RequestsTransport",
+    "RequestsTransportResponse",
+    "AsyncHttpTransport",
+    "AsyncHttpResponse",
+    "AsyncioRequestsTransport",
+    "AsyncioRequestsTransportResponse",
+    "TrioRequestsTransport",
+    "TrioRequestsTransportResponse",
+    "AioHttpTransport",
+    "AioHttpTransportResponse",
+]
+
+# pylint: disable= no-member, too-many-statements
+
+
+def __dir__() -> List[str]:
+    return __all__
+
+
+# To do nice overloads, need https://github.com/python/mypy/issues/8203
+
+
+def __getattr__(name: str):
+    transport: Optional[Any] = None
+    if name == "AsyncioRequestsTransport":
+        try:
+            from ._requests_asyncio import AsyncioRequestsTransport
+
+            transport = AsyncioRequestsTransport
+        except ImportError as err:
+            raise ImportError("requests package is not installed") from err
+    if name == "AsyncioRequestsTransportResponse":
+        try:
+            from ._requests_asyncio import AsyncioRequestsTransportResponse
+
+            transport = AsyncioRequestsTransportResponse
+        except ImportError as err:
+            raise ImportError("requests package is not installed") from err
+    if name == "RequestsTransport":
+        try:
+            from ._requests_basic import RequestsTransport
+
+            transport = RequestsTransport
+        except ImportError as err:
+            raise ImportError("requests package is not installed") from err
+    if name == "RequestsTransportResponse":
+        try:
+            from ._requests_basic import RequestsTransportResponse
+
+            transport = RequestsTransportResponse
+        except ImportError as err:
+            raise ImportError("requests package is not installed") from err
+    if name == "AioHttpTransport":
+        try:
+            from ._aiohttp import AioHttpTransport
+
+            transport = AioHttpTransport
+        except ImportError as err:
+            raise ImportError("aiohttp package is not installed") from err
+    if name == "AioHttpTransportResponse":
+        try:
+            from ._aiohttp import AioHttpTransportResponse
+
+            transport = AioHttpTransportResponse
+        except ImportError as err:
+            raise ImportError("aiohttp package is not installed") from err
+    if name == "TrioRequestsTransport":
+        try:
+            from ._requests_trio import TrioRequestsTransport
+
+            transport = TrioRequestsTransport
+        except ImportError as ex:
+            if ex.msg.endswith("'requests'"):
+                raise ImportError("requests package is not installed") from ex
+            raise ImportError("trio package is not installed") from ex
+    if name == "TrioRequestsTransportResponse":
+        try:
+            from ._requests_trio import TrioRequestsTransportResponse
+
+            transport = TrioRequestsTransportResponse
+        except ImportError as err:
+            raise ImportError("trio package is not installed") from err
+    if transport:
+        return transport
+    raise AttributeError(f"module 'azure.core.pipeline.transport' has no attribute {name}")
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py
new file mode 100644
index 00000000..32d97ad3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py
@@ -0,0 +1,571 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from __future__ import annotations
+import sys
+from typing import (
+    Any,
+    Optional,
+    AsyncIterator as AsyncIteratorType,
+    TYPE_CHECKING,
+    overload,
+    cast,
+    Union,
+    Type,
+    MutableMapping,
+)
+from types import TracebackType
+from collections.abc import AsyncIterator
+
+import logging
+import asyncio
+import codecs
+import aiohttp
+import aiohttp.client_exceptions
+from multidict import CIMultiDict
+
+from azure.core.configuration import ConnectionConfiguration
+from azure.core.exceptions import (
+    ServiceRequestError,
+    ServiceResponseError,
+    IncompleteReadError,
+)
+from azure.core.pipeline import AsyncPipeline
+
+from ._base import HttpRequest
+from ._base_async import AsyncHttpTransport, AsyncHttpResponse, _ResponseStopIteration
+from ...utils._pipeline_transport_rest_shared import (
+    _aiohttp_body_helper,
+    get_file_items,
+)
+from .._tools import is_rest as _is_rest
+from .._tools_async import (
+    handle_no_stream_rest_response as _handle_no_stream_rest_response,
+)
+
+if TYPE_CHECKING:
+    from ...rest import (
+        HttpRequest as RestHttpRequest,
+        AsyncHttpResponse as RestAsyncHttpResponse,
+    )
+    from ...rest._aiohttp import RestAioHttpTransportResponse
+
+# Matching requests, because why not?
+CONTENT_CHUNK_SIZE = 10 * 1024
+_LOGGER = logging.getLogger(__name__)
+
+
+class AioHttpTransport(AsyncHttpTransport):
+    """AioHttp HTTP sender implementation.
+
+    Fully asynchronous implementation using the aiohttp library.
+
+    :keyword session: The client session.
+    :paramtype session: ~aiohttp.ClientSession
+    :keyword bool session_owner: Session owner. Defaults True.
+
+    :keyword bool use_env_settings: Uses proxy settings from environment. Defaults to True.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/test_example_async.py
+            :start-after: [START aiohttp]
+            :end-before: [END aiohttp]
+            :language: python
+            :dedent: 4
+            :caption: Asynchronous transport with aiohttp.
+    """
+
+    def __init__(
+        self,
+        *,
+        session: Optional[aiohttp.ClientSession] = None,
+        loop=None,
+        session_owner: bool = True,
+        **kwargs,
+    ):
+        if loop and sys.version_info >= (3, 10):
+            raise ValueError("Starting with Python 3.10, asyncio doesn’t support loop as a parameter anymore")
+        self._loop = loop
+        self._session_owner = session_owner
+        self.session = session
+        if not self._session_owner and not self.session:
+            raise ValueError("session_owner cannot be False if no session is provided")
+        self.connection_config = ConnectionConfiguration(**kwargs)
+        self._use_env_settings = kwargs.pop("use_env_settings", True)
+        # See https://github.com/Azure/azure-sdk-for-python/issues/25640 to understand why we track this
+        self._has_been_opened = False
+
+    async def __aenter__(self):
+        await self.open()
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]] = None,
+        exc_value: Optional[BaseException] = None,
+        traceback: Optional[TracebackType] = None,
+    ) -> None:
+        await self.close()
+
+    async def open(self):
+        if self._has_been_opened and not self.session:
+            raise ValueError(
+                "HTTP transport has already been closed. "
+                "You may check if you're calling a function outside of the `async with` of your client creation, "
+                "or if you called `await close()` on your client already."
+            )
+        if not self.session:
+            if self._session_owner:
+                jar = aiohttp.DummyCookieJar()
+                clientsession_kwargs = {
+                    "trust_env": self._use_env_settings,
+                    "cookie_jar": jar,
+                    "auto_decompress": False,
+                }
+                if self._loop is not None:
+                    clientsession_kwargs["loop"] = self._loop
+                self.session = aiohttp.ClientSession(**clientsession_kwargs)
+            else:
+                raise ValueError("session_owner cannot be False and no session is available")
+
+        self._has_been_opened = True
+        await self.session.__aenter__()
+
+    async def close(self):
+        """Closes the connection."""
+        if self._session_owner and self.session:
+            await self.session.close()
+            self.session = None
+
+    def _build_ssl_config(self, cert, verify):
+        """Build the SSL configuration.
+
+        :param tuple cert: Cert information
+        :param bool verify: SSL verification or path to CA file or directory
+        :rtype: bool or str or ssl.SSLContext
+        :return: SSL Configuration
+        """
+        ssl_ctx = None
+
+        if cert or verify not in (True, False):
+            import ssl
+
+            if verify not in (True, False):
+                ssl_ctx = ssl.create_default_context(cafile=verify)
+            else:
+                ssl_ctx = ssl.create_default_context()
+            if cert:
+                ssl_ctx.load_cert_chain(*cert)
+            return ssl_ctx
+        return verify
+
+    def _get_request_data(self, request):
+        """Get the request data.
+
+        :param request: The request object
+        :type request: ~azure.core.pipeline.transport.HttpRequest or ~azure.core.rest.HttpRequest
+        :rtype: bytes or ~aiohttp.FormData
+        :return: The request data
+        """
+        if request.files:
+            form_data = aiohttp.FormData(request.data or {})
+            for form_file, data in get_file_items(request.files):
+                content_type = data[2] if len(data) > 2 else None
+                try:
+                    form_data.add_field(form_file, data[1], filename=data[0], content_type=content_type)
+                except IndexError as err:
+                    raise ValueError("Invalid formdata formatting: {}".format(data)) from err
+            return form_data
+        return request.data
+
+    @overload
+    async def send(
+        self,
+        request: HttpRequest,
+        *,
+        stream: bool = False,
+        proxies: Optional[MutableMapping[str, str]] = None,
+        **config: Any,
+    ) -> AsyncHttpResponse:
+        """Send the request using this HTTP sender.
+
+        Will pre-load the body into memory to be available with a sync method.
+        Pass stream=True to avoid this behavior.
+
+        :param request: The HttpRequest object
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
+
+        :keyword bool stream: Defaults to False.
+        :keyword MutableMapping proxies: dict of proxy to used based on protocol. Proxy is a dict (protocol, url)
+        """
+
+    @overload
+    async def send(
+        self,
+        request: RestHttpRequest,
+        *,
+        stream: bool = False,
+        proxies: Optional[MutableMapping[str, str]] = None,
+        **config: Any,
+    ) -> RestAsyncHttpResponse:
+        """Send the `azure.core.rest` request using this HTTP sender.
+
+        Will pre-load the body into memory to be available with a sync method.
+        Pass stream=True to avoid this behavior.
+
+        :param request: The HttpRequest object
+        :type request: ~azure.core.rest.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.rest.AsyncHttpResponse
+
+        :keyword bool stream: Defaults to False.
+        :keyword MutableMapping proxies: dict of proxy to used based on protocol. Proxy is a dict (protocol, url)
+        """
+
+    async def send(
+        self,
+        request: Union[HttpRequest, RestHttpRequest],
+        *,
+        stream: bool = False,
+        proxies: Optional[MutableMapping[str, str]] = None,
+        **config,
+    ) -> Union[AsyncHttpResponse, RestAsyncHttpResponse]:
+        """Send the request using this HTTP sender.
+
+        Will pre-load the body into memory to be available with a sync method.
+        Pass stream=True to avoid this behavior.
+
+        :param request: The HttpRequest object
+        :type request: ~azure.core.rest.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.rest.AsyncHttpResponse
+
+        :keyword bool stream: Defaults to False.
+        :keyword MutableMapping proxies: dict of proxy to used based on protocol. Proxy is a dict (protocol, url)
+        """
+        await self.open()
+        try:
+            auto_decompress = self.session.auto_decompress  # type: ignore
+        except AttributeError:
+            # auto_decompress is introduced in aiohttp 3.7. We need this to handle aiohttp 3.6-.
+            auto_decompress = False
+
+        proxy = config.pop("proxy", None)
+        if proxies and not proxy:
+            # aiohttp needs a single proxy, so iterating until we found the right protocol
+
+            # Sort by longest string first, so "http" is not used for "https" ;-)
+            for protocol in sorted(proxies.keys(), reverse=True):
+                if request.url.startswith(protocol):
+                    proxy = proxies[protocol]
+                    break
+
+        response: Optional[Union[AsyncHttpResponse, RestAsyncHttpResponse]] = None
+        ssl = self._build_ssl_config(
+            cert=config.pop("connection_cert", self.connection_config.cert),
+            verify=config.pop("connection_verify", self.connection_config.verify),
+        )
+        # If ssl=True, we just use default ssl context from aiohttp
+        if ssl is not True:
+            config["ssl"] = ssl
+        # If we know for sure there is not body, disable "auto content type"
+        # Otherwise, aiohttp will send "application/octet-stream" even for empty POST request
+        # and that break services like storage signature
+        if not request.data and not request.files:
+            config["skip_auto_headers"] = ["Content-Type"]
+        try:
+            stream_response = stream
+            timeout = config.pop("connection_timeout", self.connection_config.timeout)
+            read_timeout = config.pop("read_timeout", self.connection_config.read_timeout)
+            socket_timeout = aiohttp.ClientTimeout(sock_connect=timeout, sock_read=read_timeout)
+            result = await self.session.request(  # type: ignore
+                request.method,
+                request.url,
+                headers=request.headers,
+                data=self._get_request_data(request),
+                timeout=socket_timeout,
+                allow_redirects=False,
+                proxy=proxy,
+                **config,
+            )
+            if _is_rest(request):
+                from azure.core.rest._aiohttp import RestAioHttpTransportResponse
+
+                response = RestAioHttpTransportResponse(
+                    request=request,
+                    internal_response=result,
+                    block_size=self.connection_config.data_block_size,
+                    decompress=not auto_decompress,
+                )
+                if not stream_response:
+                    await _handle_no_stream_rest_response(response)
+            else:
+                # Given the associated "if", this else is legacy implementation
+                # but mypy do not know it, so using a cast
+                request = cast(HttpRequest, request)
+                response = AioHttpTransportResponse(
+                    request,
+                    result,
+                    self.connection_config.data_block_size,
+                    decompress=not auto_decompress,
+                )
+                if not stream_response:
+                    await response.load_body()
+        except AttributeError as err:
+            if self.session is None:
+                raise ValueError(
+                    "No session available for request. "
+                    "Please report this issue to https://github.com/Azure/azure-sdk-for-python/issues."
+                ) from err
+            raise
+        except aiohttp.client_exceptions.ClientResponseError as err:
+            raise ServiceResponseError(err, error=err) from err
+        except asyncio.TimeoutError as err:
+            raise ServiceResponseError(err, error=err) from err
+        except aiohttp.client_exceptions.ClientError as err:
+            raise ServiceRequestError(err, error=err) from err
+        return response
+
+
+class AioHttpStreamDownloadGenerator(AsyncIterator):
+    """Streams the response body data.
+
+    :param pipeline: The pipeline object
+    :type pipeline: ~azure.core.pipeline.AsyncPipeline
+    :param response: The client response object.
+    :type response: ~azure.core.rest.AsyncHttpResponse
+    :keyword bool decompress: If True which is default, will attempt to decode the body based
+        on the *content-encoding* header.
+    """
+
+    @overload
+    def __init__(
+        self,
+        pipeline: AsyncPipeline[HttpRequest, AsyncHttpResponse],
+        response: AioHttpTransportResponse,
+        *,
+        decompress: bool = True,
+    ) -> None: ...
+
+    @overload
+    def __init__(
+        self,
+        pipeline: AsyncPipeline[RestHttpRequest, RestAsyncHttpResponse],
+        response: RestAioHttpTransportResponse,
+        *,
+        decompress: bool = True,
+    ) -> None: ...
+
+    def __init__(
+        self,
+        pipeline: AsyncPipeline,
+        response: Union[AioHttpTransportResponse, RestAioHttpTransportResponse],
+        *,
+        decompress: bool = True,
+    ) -> None:
+        self.pipeline = pipeline
+        self.request = response.request
+        self.response = response
+        self.block_size = response.block_size
+        self._decompress = decompress
+        internal_response = response.internal_response
+        self.content_length = int(internal_response.headers.get("Content-Length", 0))
+        self._decompressor = None
+
+    def __len__(self):
+        return self.content_length
+
+    async def __anext__(self):
+        internal_response = self.response.internal_response
+        try:
+            chunk = await internal_response.content.read(self.block_size)
+            if not chunk:
+                raise _ResponseStopIteration()
+            if not self._decompress:
+                return chunk
+            enc = internal_response.headers.get("Content-Encoding")
+            if not enc:
+                return chunk
+            enc = enc.lower()
+            if enc in ("gzip", "deflate"):
+                if not self._decompressor:
+                    import zlib
+
+                    zlib_mode = (16 + zlib.MAX_WBITS) if enc == "gzip" else -zlib.MAX_WBITS
+                    self._decompressor = zlib.decompressobj(wbits=zlib_mode)
+                chunk = self._decompressor.decompress(chunk)
+            return chunk
+        except _ResponseStopIteration:
+            internal_response.close()
+            raise StopAsyncIteration()  # pylint: disable=raise-missing-from
+        except aiohttp.client_exceptions.ClientPayloadError as err:
+            # This is the case that server closes connection before we finish the reading. aiohttp library
+            # raises ClientPayloadError.
+            _LOGGER.warning("Incomplete download: %s", err)
+            internal_response.close()
+            raise IncompleteReadError(err, error=err) from err
+        except aiohttp.client_exceptions.ClientResponseError as err:
+            raise ServiceResponseError(err, error=err) from err
+        except asyncio.TimeoutError as err:
+            raise ServiceResponseError(err, error=err) from err
+        except aiohttp.client_exceptions.ClientError as err:
+            raise ServiceRequestError(err, error=err) from err
+        except Exception as err:
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise
+
+
+class AioHttpTransportResponse(AsyncHttpResponse):
+    """Methods for accessing response body data.
+
+    :param request: The HttpRequest object
+    :type request: ~azure.core.pipeline.transport.HttpRequest
+    :param aiohttp_response: Returned from ClientSession.request().
+    :type aiohttp_response: aiohttp.ClientResponse object
+    :param block_size: block size of data sent over connection.
+    :type block_size: int
+    :keyword bool decompress: If True which is default, will attempt to decode the body based
+            on the *content-encoding* header.
+    """
+
+    def __init__(
+        self,
+        request: HttpRequest,
+        aiohttp_response: aiohttp.ClientResponse,
+        block_size: Optional[int] = None,
+        *,
+        decompress: bool = True,
+    ) -> None:
+        super(AioHttpTransportResponse, self).__init__(request, aiohttp_response, block_size=block_size)
+        # https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse
+        self.status_code = aiohttp_response.status
+        self.headers = CIMultiDict(aiohttp_response.headers)
+        self.reason = aiohttp_response.reason
+        self.content_type = aiohttp_response.headers.get("content-type")
+        self._content = None
+        self._decompressed_content = False
+        self._decompress = decompress
+
+    def body(self) -> bytes:
+        """Return the whole body as bytes in memory.
+
+        :rtype: bytes
+        :return: The whole response body.
+        """
+        return _aiohttp_body_helper(self)
+
+    def text(self, encoding: Optional[str] = None) -> str:
+        """Return the whole body as a string.
+
+        If encoding is not provided, rely on aiohttp auto-detection.
+
+        :param str encoding: The encoding to apply.
+        :rtype: str
+        :return: The whole response body as a string.
+        """
+        # super().text detects charset based on self._content() which is compressed
+        # implement the decoding explicitly here
+        body = self.body()
+
+        ctype = self.headers.get(aiohttp.hdrs.CONTENT_TYPE, "").lower()
+        mimetype = aiohttp.helpers.parse_mimetype(ctype)
+
+        if not encoding:
+            # extract encoding from mimetype, if caller does not specify
+            encoding = mimetype.parameters.get("charset")
+        if encoding:
+            try:
+                codecs.lookup(encoding)
+            except LookupError:
+                encoding = None
+        if not encoding:
+            if mimetype.type == "application" and mimetype.subtype in ["json", "rdap"]:
+                # RFC 7159 states that the default encoding is UTF-8.
+                # RFC 7483 defines application/rdap+json
+                encoding = "utf-8"
+            elif body is None:
+                raise RuntimeError("Cannot guess the encoding of a not yet read body")
+            else:
+                try:
+                    import cchardet as chardet
+                except ImportError:  # pragma: no cover
+                    try:
+                        import chardet  # type: ignore
+                    except ImportError:  # pragma: no cover
+                        import charset_normalizer as chardet  # type: ignore[no-redef]
+                # While "detect" can return a dict of float, in this context this won't happen
+                # The cast is for pyright to be happy
+                encoding = cast(Optional[str], chardet.detect(body)["encoding"])
+        if encoding == "utf-8" or encoding is None:
+            encoding = "utf-8-sig"
+
+        return body.decode(encoding)
+
+    async def load_body(self) -> None:
+        """Load in memory the body, so it could be accessible from sync methods."""
+        try:
+            self._content = await self.internal_response.read()
+        except aiohttp.client_exceptions.ClientPayloadError as err:
+            # This is the case that server closes connection before we finish the reading. aiohttp library
+            # raises ClientPayloadError.
+            raise IncompleteReadError(err, error=err) from err
+        except aiohttp.client_exceptions.ClientResponseError as err:
+            raise ServiceResponseError(err, error=err) from err
+        except asyncio.TimeoutError as err:
+            raise ServiceResponseError(err, error=err) from err
+        except aiohttp.client_exceptions.ClientError as err:
+            raise ServiceRequestError(err, error=err) from err
+
+    def stream_download(
+        self,
+        pipeline: AsyncPipeline[HttpRequest, AsyncHttpResponse],
+        *,
+        decompress: bool = True,
+        **kwargs,
+    ) -> AsyncIteratorType[bytes]:
+        """Generator for streaming response body data.
+
+        :param pipeline: The pipeline object
+        :type pipeline: azure.core.pipeline.AsyncPipeline
+        :keyword bool decompress: If True which is default, will attempt to decode the body based
+            on the *content-encoding* header.
+        :rtype: AsyncIterator[bytes]
+        :return: An iterator of bytes chunks.
+        """
+        return AioHttpStreamDownloadGenerator(pipeline, self, decompress=decompress, **kwargs)
+
+    def __getstate__(self):
+        # Be sure body is loaded in memory, otherwise not pickable and let it throw
+        self.body()
+
+        state = self.__dict__.copy()
+        # Remove the unpicklable entries.
+        state["internal_response"] = None  # aiohttp response are not pickable (see headers comments)
+        state["headers"] = CIMultiDict(self.headers)  # MultiDictProxy is not pickable
+        return state
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py
new file mode 100644
index 00000000..eb3d8fdf
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py
@@ -0,0 +1,863 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from __future__ import annotations
+import abc
+from email.message import Message
+import json
+import logging
+import time
+import copy
+from urllib.parse import urlparse
+import xml.etree.ElementTree as ET
+
+from typing import (
+    Generic,
+    TypeVar,
+    IO,
+    Union,
+    Any,
+    Mapping,
+    Optional,
+    Tuple,
+    Iterator,
+    Type,
+    Dict,
+    List,
+    Sequence,
+    MutableMapping,
+    ContextManager,
+    TYPE_CHECKING,
+)
+
+from http.client import HTTPResponse as _HTTPResponse
+
+from azure.core.exceptions import HttpResponseError
+from azure.core.pipeline.policies import SansIOHTTPPolicy
+from ...utils._utils import case_insensitive_dict
+from ...utils._pipeline_transport_rest_shared import (
+    _format_parameters_helper,
+    _prepare_multipart_body_helper,
+    _serialize_request,
+    _format_data_helper,
+    BytesIOSocket,
+    _decode_parts_helper,
+    _get_raw_parts_helper,
+    _parts_helper,
+)
+
+
+HTTPResponseType = TypeVar("HTTPResponseType")
+HTTPRequestType = TypeVar("HTTPRequestType")
+DataType = Union[bytes, str, Dict[str, Union[str, int]]]
+
+if TYPE_CHECKING:
+    # We need a transport to define a pipeline, this "if" avoid a circular import
+    from azure.core.pipeline import Pipeline
+    from azure.core.rest._helpers import FileContent
+
+_LOGGER = logging.getLogger(__name__)
+
+binary_type = str
+
+
+def _format_url_section(template, **kwargs: Dict[str, str]) -> str:
+    """String format the template with the kwargs, auto-skip sections of the template that are NOT in the kwargs.
+
+    By default in Python, "format" will raise a KeyError if a template element is not found. Here the section between
+    the slashes will be removed from the template instead.
+
+    This is used for API like Storage, where when Swagger has template section not defined as parameter.
+
+    :param str template: a string template to fill
+    :rtype: str
+    :returns: Template completed
+    """
+    last_template = template
+    components = template.split("/")
+    while components:
+        try:
+            return template.format(**kwargs)
+        except KeyError as key:
+            formatted_components = template.split("/")
+            components = [c for c in formatted_components if "{{{}}}".format(key.args[0]) not in c]
+            template = "/".join(components)
+            if last_template == template:
+                raise ValueError(
+                    f"The value provided for the url part '{template}' was incorrect, and resulted in an invalid url"
+                ) from key
+            last_template = template
+    return last_template
+
+
+def _urljoin(base_url: str, stub_url: str) -> str:
+    """Append to end of base URL without losing query parameters.
+
+    :param str base_url: The base URL.
+    :param str stub_url: Section to append to the end of the URL path.
+    :returns: The updated URL.
+    :rtype: str
+    """
+    parsed_base_url = urlparse(base_url)
+
+    # Can't use "urlparse" on a partial url, we get incorrect parsing for things like
+    # document:build?format=html&api-version=2019-05-01
+    split_url = stub_url.split("?", 1)
+    stub_url_path = split_url.pop(0)
+    stub_url_query = split_url.pop() if split_url else None
+
+    # Note that _replace is a public API named that way to avoid conflicts in namedtuple
+    # https://docs.python.org/3/library/collections.html?highlight=namedtuple#collections.namedtuple
+    parsed_base_url = parsed_base_url._replace(
+        path=parsed_base_url.path.rstrip("/") + "/" + stub_url_path,
+    )
+    if stub_url_query:
+        query_params = [stub_url_query]
+        if parsed_base_url.query:
+            query_params.insert(0, parsed_base_url.query)
+        parsed_base_url = parsed_base_url._replace(query="&".join(query_params))
+    return parsed_base_url.geturl()
+
+
+class HttpTransport(ContextManager["HttpTransport"], abc.ABC, Generic[HTTPRequestType, HTTPResponseType]):
+    """An http sender ABC."""
+
+    @abc.abstractmethod
+    def send(self, request: HTTPRequestType, **kwargs: Any) -> HTTPResponseType:
+        """Send the request using this HTTP sender.
+
+        :param request: The pipeline request object
+        :type request: ~azure.core.transport.HTTPRequest
+        :return: The pipeline response object.
+        :rtype: ~azure.core.pipeline.transport.HttpResponse
+        """
+
+    @abc.abstractmethod
+    def open(self) -> None:
+        """Assign new session if one does not already exist."""
+
+    @abc.abstractmethod
+    def close(self) -> None:
+        """Close the session if it is not externally owned."""
+
+    def sleep(self, duration: float) -> None:
+        """Sleep for the specified duration.
+
+        You should always ask the transport to sleep, and not call directly
+        the stdlib. This is mostly important in async, as the transport
+        may not use asyncio but other implementations like trio and they have their own
+        way to sleep, but to keep design
+        consistent, it's cleaner to always ask the transport to sleep and let the transport
+        implementor decide how to do it.
+
+        :param float duration: The number of seconds to sleep.
+        """
+        time.sleep(duration)
+
+
+class HttpRequest:
+    """Represents an HTTP request.
+
+    URL can be given without query parameters, to be added later using "format_parameters".
+
+    :param str method: HTTP method (GET, HEAD, etc.)
+    :param str url: At least complete scheme/host/path
+    :param dict[str,str] headers: HTTP headers
+    :param files: Dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``) for multipart
+        encoding upload. ``file-tuple`` can be a 2-tuple ``('filename', fileobj)``, 3-tuple
+        ``('filename', fileobj, 'content_type')`` or a 4-tuple
+        ``('filename', fileobj, 'content_type', custom_headers)``, where ``'content_type'`` is a string
+        defining the content type of the given file and ``custom_headers``
+        a dict-like object containing additional headers to add for the file.
+    :type files: dict[str, tuple[str, IO, str, dict]] or dict[str, IO]
+    :param data: Body to be sent.
+    :type data: bytes or dict (for form)
+    """
+
+    def __init__(
+        self,
+        method: str,
+        url: str,
+        headers: Optional[Mapping[str, str]] = None,
+        files: Optional[Any] = None,
+        data: Optional[DataType] = None,
+    ) -> None:
+        self.method = method
+        self.url = url
+        self.headers: MutableMapping[str, str] = case_insensitive_dict(headers)
+        self.files: Optional[Any] = files
+        self.data: Optional[DataType] = data
+        self.multipart_mixed_info: Optional[Tuple[Sequence[Any], Sequence[Any], Optional[str], Dict[str, Any]]] = None
+
+    def __repr__(self) -> str:
+        return "<HttpRequest [{}], url: '{}'>".format(self.method, self.url)
+
+    def __deepcopy__(self, memo: Optional[Dict[int, Any]] = None) -> "HttpRequest":
+        try:
+            data = copy.deepcopy(self.body, memo)
+            files = copy.deepcopy(self.files, memo)
+            request = HttpRequest(self.method, self.url, self.headers, files, data)
+            request.multipart_mixed_info = self.multipart_mixed_info
+            return request
+        except (ValueError, TypeError):
+            return copy.copy(self)
+
+    @property
+    def query(self) -> Dict[str, str]:
+        """The query parameters of the request as a dict.
+
+        :rtype: dict[str, str]
+        :return: The query parameters of the request as a dict.
+        """
+        query = urlparse(self.url).query
+        if query:
+            return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]}
+        return {}
+
+    @property
+    def body(self) -> Optional[DataType]:
+        """Alias to data.
+
+        :rtype: bytes or str or dict or None
+        :return: The body of the request.
+        """
+        return self.data
+
+    @body.setter
+    def body(self, value: Optional[DataType]) -> None:
+        self.data = value
+
+    @staticmethod
+    def _format_data(data: Union[str, IO]) -> Union[Tuple[Optional[str], str], Tuple[Optional[str], FileContent, str]]:
+        """Format field data according to whether it is a stream or
+        a string for a form-data request.
+
+        :param data: The request field data.
+        :type data: str or file-like object.
+        :rtype: tuple[str, IO, str] or tuple[None, str]
+        :return: A tuple of (data name, data IO, "application/octet-stream") or (None, data str)
+        """
+        return _format_data_helper(data)
+
+    def format_parameters(self, params: Dict[str, str]) -> None:
+        """Format parameters into a valid query string.
+        It's assumed all parameters have already been quoted as
+        valid URL strings.
+
+        :param dict params: A dictionary of parameters.
+        """
+        return _format_parameters_helper(self, params)
+
+    def set_streamed_data_body(self, data: Any) -> None:
+        """Set a streamable data body.
+
+        :param data: The request field data.
+        :type data: stream or generator or asyncgenerator
+        """
+        if not isinstance(data, binary_type) and not any(
+            hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"]
+        ):
+            raise TypeError("A streamable data source must be an open file-like object or iterable.")
+        self.data = data
+        self.files = None
+
+    def set_text_body(self, data: str) -> None:
+        """Set a text as body of the request.
+
+        :param data: A text to send as body.
+        :type data: str
+        """
+        if data is None:
+            self.data = None
+        else:
+            self.data = data
+            self.headers["Content-Length"] = str(len(self.data))
+        self.files = None
+
+    def set_xml_body(self, data: Any) -> None:
+        """Set an XML element tree as the body of the request.
+
+        :param data: The request field data.
+        :type data: XML node
+        """
+        if data is None:
+            self.data = None
+        else:
+            bytes_data: bytes = ET.tostring(data, encoding="utf8")
+            self.data = bytes_data.replace(b"encoding='utf8'", b"encoding='utf-8'")
+            self.headers["Content-Length"] = str(len(self.data))
+        self.files = None
+
+    def set_json_body(self, data: Any) -> None:
+        """Set a JSON-friendly object as the body of the request.
+
+        :param data: A JSON serializable object
+        :type data: dict
+        """
+        if data is None:
+            self.data = None
+        else:
+            self.data = json.dumps(data)
+            self.headers["Content-Length"] = str(len(self.data))
+        self.files = None
+
+    def set_formdata_body(self, data: Optional[Dict[str, str]] = None) -> None:
+        """Set form-encoded data as the body of the request.
+
+        :param data: The request field data.
+        :type data: dict
+        """
+        if data is None:
+            data = {}
+        content_type = self.headers.pop("Content-Type", None) if self.headers else None
+
+        if content_type and content_type.lower() == "application/x-www-form-urlencoded":
+            self.data = {f: d for f, d in data.items() if d is not None}
+            self.files = None
+        else:  # Assume "multipart/form-data"
+            self.files = {f: self._format_data(d) for f, d in data.items() if d is not None}
+            self.data = None
+
+    def set_bytes_body(self, data: bytes) -> None:
+        """Set generic bytes as the body of the request.
+
+        Will set content-length.
+
+        :param data: The request field data.
+        :type data: bytes
+        """
+        if data:
+            self.headers["Content-Length"] = str(len(data))
+        self.data = data
+        self.files = None
+
+    def set_multipart_mixed(
+        self,
+        *requests: "HttpRequest",
+        policies: Optional[List[SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]]] = None,
+        boundary: Optional[str] = None,
+        **kwargs: Any,
+    ) -> None:
+        """Set the part of a multipart/mixed.
+
+        Only supported args for now are HttpRequest objects.
+
+        boundary is optional, and one will be generated if you don't provide one.
+        Note that no verification are made on the boundary, this is considered advanced
+        enough so you know how to respect RFC1341 7.2.1 and provide a correct boundary.
+
+        Any additional kwargs will be passed into the pipeline context for per-request policy
+        configuration.
+
+        :param requests: The requests to add to the multipart/mixed
+        :type requests: ~azure.core.pipeline.transport.HttpRequest
+        :keyword list[SansIOHTTPPolicy] policies: SansIOPolicy to apply at preparation time
+        :keyword str boundary: Optional boundary
+        """
+        policies = policies or []
+        self.multipart_mixed_info = (
+            requests,
+            policies,
+            boundary,
+            kwargs,
+        )
+
+    def prepare_multipart_body(self, content_index: int = 0) -> int:
+        """Will prepare the body of this request according to the multipart information.
+
+        This call assumes the on_request policies have been applied already in their
+        correct context (sync/async)
+
+        Does nothing if "set_multipart_mixed" was never called.
+
+        :param int content_index: The current index of parts within the batch message.
+        :returns: The updated index after all parts in this request have been added.
+        :rtype: int
+        """
+        return _prepare_multipart_body_helper(self, content_index)
+
+    def serialize(self) -> bytes:
+        """Serialize this request using application/http spec.
+
+        :rtype: bytes
+        :return: The requests serialized as HTTP low-level message in bytes.
+        """
+        return _serialize_request(self)
+
+
+class _HttpResponseBase:
+    """Represent a HTTP response.
+
+    No body is defined here on purpose, since async pipeline
+    will provide async ways to access the body
+    Full in-memory using "body" as bytes.
+
+    :param request: The request.
+    :type request: ~azure.core.pipeline.transport.HttpRequest
+    :param internal_response: The object returned from the HTTP library.
+    :type internal_response: any
+    :param int block_size: Defaults to 4096 bytes.
+    """
+
+    def __init__(
+        self,
+        request: "HttpRequest",
+        internal_response: Any,
+        block_size: Optional[int] = None,
+    ) -> None:
+        self.request: HttpRequest = request
+        self.internal_response = internal_response
+        # This is actually never None, and set by all implementations after the call to
+        # __init__ of this class. This class is also a legacy impl, so it's risky to change it
+        # for low benefits The new "rest" implementation does define correctly status_code
+        # as non-optional.
+        self.status_code: int = None  # type: ignore
+        self.headers: MutableMapping[str, str] = {}
+        self.reason: Optional[str] = None
+        self.content_type: Optional[str] = None
+        self.block_size: int = block_size or 4096  # Default to same as Requests
+
+    def body(self) -> bytes:
+        """Return the whole body as bytes in memory.
+
+        Sync implementer should load the body in memory if they can.
+        Async implementer should rely on async load_body to have been called first.
+
+        :rtype: bytes
+        :return: The whole body as bytes in memory.
+        """
+        raise NotImplementedError()
+
+    def text(self, encoding: Optional[str] = None) -> str:
+        """Return the whole body as a string.
+
+        .. seealso:: ~body()
+
+        :param str encoding: The encoding to apply. If None, use "utf-8" with BOM parsing (utf-8-sig).
+         Implementation can be smarter if they want (using headers or chardet).
+        :rtype: str
+        :return: The whole body as a string.
+        """
+        if encoding == "utf-8" or encoding is None:
+            encoding = "utf-8-sig"
+        return self.body().decode(encoding)
+
+    def _decode_parts(
+        self,
+        message: Message,
+        http_response_type: Type["_HttpResponseBase"],
+        requests: Sequence[HttpRequest],
+    ) -> List["HttpResponse"]:
+        """Rebuild an HTTP response from pure string.
+
+        :param ~email.message.Message message: The HTTP message as an email object
+        :param type http_response_type: The type of response to return
+        :param list[HttpRequest] requests: The requests that were batched together
+        :rtype: list[HttpResponse]
+        :return: The list of HttpResponse
+        """
+        return _decode_parts_helper(self, message, http_response_type, requests, _deserialize_response)
+
+    def _get_raw_parts(
+        self, http_response_type: Optional[Type["_HttpResponseBase"]] = None
+    ) -> Iterator["HttpResponse"]:
+        """Assuming this body is multipart, return the iterator or parts.
+
+        If parts are application/http use http_response_type or HttpClientTransportResponse
+        as envelope.
+
+        :param type http_response_type: The type of response to return
+        :rtype: iterator[HttpResponse]
+        :return: The iterator of HttpResponse
+        """
+        return _get_raw_parts_helper(self, http_response_type or HttpClientTransportResponse)
+
+    def raise_for_status(self) -> None:
+        """Raises an HttpResponseError if the response has an error status code.
+        If response is good, does nothing.
+        """
+        if not self.status_code or self.status_code >= 400:
+            raise HttpResponseError(response=self)
+
+    def __repr__(self) -> str:
+        content_type_str = ", Content-Type: {}".format(self.content_type) if self.content_type else ""
+        return "<{}: {} {}{}>".format(type(self).__name__, self.status_code, self.reason, content_type_str)
+
+
+class HttpResponse(_HttpResponseBase):
+    def stream_download(self, pipeline: Pipeline[HttpRequest, "HttpResponse"], **kwargs: Any) -> Iterator[bytes]:
+        """Generator for streaming request body data.
+
+        Should be implemented by sub-classes if streaming download
+        is supported.
+
+        :param pipeline: The pipeline object
+        :type pipeline: ~azure.core.pipeline.Pipeline
+        :rtype: iterator[bytes]
+        :return: The generator of bytes connected to the socket
+        """
+        raise NotImplementedError("stream_download is not implemented.")
+
+    def parts(self) -> Iterator["HttpResponse"]:
+        """Assuming the content-type is multipart/mixed, will return the parts as an iterator.
+
+        :rtype: iterator[HttpResponse]
+        :return: The iterator of HttpResponse if request was multipart/mixed
+        :raises ValueError: If the content is not multipart/mixed
+        """
+        return _parts_helper(self)
+
+
+class _HttpClientTransportResponse(_HttpResponseBase):
+    """Create a HTTPResponse from an http.client response.
+
+    Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary.
+
+    :param HttpRequest request: The request.
+    :param httpclient_response: The object returned from an HTTP(S)Connection from http.client
+    :type httpclient_response: http.client.HTTPResponse
+    """
+
+    def __init__(self, request, httpclient_response):
+        super(_HttpClientTransportResponse, self).__init__(request, httpclient_response)
+        self.status_code = httpclient_response.status
+        self.headers = case_insensitive_dict(httpclient_response.getheaders())
+        self.reason = httpclient_response.reason
+        self.content_type = self.headers.get("Content-Type")
+        self.data = None
+
+    def body(self):
+        if self.data is None:
+            self.data = self.internal_response.read()
+        return self.data
+
+
+class HttpClientTransportResponse(_HttpClientTransportResponse, HttpResponse):  # pylint: disable=abstract-method
+    """Create a HTTPResponse from an http.client response.
+
+    Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary.
+    """
+
+
+def _deserialize_response(http_response_as_bytes, http_request, http_response_type=HttpClientTransportResponse):
+    """Deserialize a HTTPResponse from a string.
+
+    :param bytes http_response_as_bytes: The HTTP response as bytes.
+    :param HttpRequest http_request: The request to store in the response.
+    :param type http_response_type: The type of response to return
+    :rtype: HttpResponse
+    :return: The HTTP response from those low-level bytes.
+    """
+    local_socket = BytesIOSocket(http_response_as_bytes)
+    response = _HTTPResponse(local_socket, method=http_request.method)
+    response.begin()
+    return http_response_type(http_request, response)
+
+
+class PipelineClientBase:
+    """Base class for pipeline clients.
+
+    :param str base_url: URL for the request.
+    """
+
+    def __init__(self, base_url: str):
+        self._base_url = base_url
+
+    def _request(
+        self,
+        method: str,
+        url: str,
+        params: Optional[Dict[str, str]],
+        headers: Optional[Dict[str, str]],
+        content: Any,
+        form_content: Optional[Dict[str, Any]],
+        stream_content: Any,
+    ) -> HttpRequest:
+        """Create HttpRequest object.
+
+        If content is not None, guesses will be used to set the right body:
+        - If content is an XML tree, will serialize as XML
+        - If content-type starts by "text/", set the content as text
+        - Else, try JSON serialization
+
+        :param str method: HTTP method (GET, HEAD, etc.)
+        :param str url: URL for the request.
+        :param dict params: URL query parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :param stream_content: The body content as a stream
+        :type stream_content: stream or generator or asyncgenerator
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = HttpRequest(method, self.format_url(url))
+
+        if params:
+            request.format_parameters(params)
+
+        if headers:
+            request.headers.update(headers)
+
+        if content is not None:
+            content_type = request.headers.get("Content-Type")
+            if isinstance(content, ET.Element):
+                request.set_xml_body(content)
+            # https://github.com/Azure/azure-sdk-for-python/issues/12137
+            # A string is valid JSON, make the difference between text
+            # and a plain JSON string.
+            # Content-Type is a good indicator of intent from user
+            elif content_type and content_type.startswith("text/"):
+                request.set_text_body(content)
+            else:
+                try:
+                    request.set_json_body(content)
+                except TypeError:
+                    request.data = content
+
+        if form_content:
+            request.set_formdata_body(form_content)
+        elif stream_content:
+            request.set_streamed_data_body(stream_content)
+
+        return request
+
+    def format_url(self, url_template: str, **kwargs: Any) -> str:
+        """Format request URL with the client base URL, unless the
+        supplied URL is already absolute.
+
+        Note that both the base url and the template url can contain query parameters.
+
+        :param str url_template: The request URL to be formatted if necessary.
+        :rtype: str
+        :return: The formatted URL.
+        """
+        url = _format_url_section(url_template, **kwargs)
+        if url:
+            parsed = urlparse(url)
+            if not parsed.scheme or not parsed.netloc:
+                url = url.lstrip("/")
+                try:
+                    base = self._base_url.format(**kwargs).rstrip("/")
+                except KeyError as key:
+                    err_msg = "The value provided for the url part {} was incorrect, and resulted in an invalid url"
+                    raise ValueError(err_msg.format(key.args[0])) from key
+
+                url = _urljoin(base, url)
+        else:
+            url = self._base_url.format(**kwargs)
+        return url
+
+    def get(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+    ) -> "HttpRequest":
+        """Create a GET request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("GET", url, params, headers, content, form_content, None)
+        request.method = "GET"
+        return request
+
+    def put(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+        stream_content: Any = None,
+    ) -> HttpRequest:
+        """Create a PUT request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :param stream_content: The body content as a stream
+        :type stream_content: stream or generator or asyncgenerator
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("PUT", url, params, headers, content, form_content, stream_content)
+        return request
+
+    def post(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+        stream_content: Any = None,
+    ) -> HttpRequest:
+        """Create a POST request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :param stream_content: The body content as a stream
+        :type stream_content: stream or generator or asyncgenerator
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("POST", url, params, headers, content, form_content, stream_content)
+        return request
+
+    def head(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+        stream_content: Any = None,
+    ) -> HttpRequest:
+        """Create a HEAD request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :param stream_content: The body content as a stream
+        :type stream_content: stream or generator or asyncgenerator
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("HEAD", url, params, headers, content, form_content, stream_content)
+        return request
+
+    def patch(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+        stream_content: Any = None,
+    ) -> HttpRequest:
+        """Create a PATCH request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :param stream_content: The body content as a stream
+        :type stream_content: stream or generator or asyncgenerator
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("PATCH", url, params, headers, content, form_content, stream_content)
+        return request
+
+    def delete(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+    ) -> HttpRequest:
+        """Create a DELETE request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("DELETE", url, params, headers, content, form_content, None)
+        return request
+
+    def merge(
+        self,
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        content: Any = None,
+        form_content: Optional[Dict[str, Any]] = None,
+    ) -> HttpRequest:
+        """Create a MERGE request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :param content: The body content
+        :type content: bytes or str or dict
+        :param dict form_content: Form content
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("MERGE", url, params, headers, content, form_content, None)
+        return request
+
+    def options(
+        self,  # pylint: disable=unused-argument
+        url: str,
+        params: Optional[Dict[str, str]] = None,
+        headers: Optional[Dict[str, str]] = None,
+        *,
+        content: Optional[Union[bytes, str, Dict[Any, Any]]] = None,
+        form_content: Optional[Dict[Any, Any]] = None,
+        **kwargs: Any,
+    ) -> HttpRequest:
+        """Create a OPTIONS request object.
+
+        :param str url: The request URL.
+        :param dict params: Request URL parameters.
+        :param dict headers: Headers
+        :keyword content: The body content
+        :type content: bytes or str or dict
+        :keyword dict form_content: Form content
+        :return: An HttpRequest object
+        :rtype: ~azure.core.pipeline.transport.HttpRequest
+        """
+        request = self._request("OPTIONS", url, params, headers, content, form_content, None)
+        return request
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py
new file mode 100644
index 00000000..f04d955e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py
@@ -0,0 +1,171 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from __future__ import annotations
+import asyncio
+import abc
+from collections.abc import AsyncIterator
+from typing import (
+    AsyncIterator as AsyncIteratorType,
+    TypeVar,
+    Generic,
+    Any,
+    AsyncContextManager,
+    Optional,
+    Type,
+    TYPE_CHECKING,
+)
+from types import TracebackType
+
+from ._base import _HttpResponseBase, _HttpClientTransportResponse, HttpRequest
+from ...utils._pipeline_transport_rest_shared_async import _PartGenerator
+
+
+AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType")
+HTTPResponseType = TypeVar("HTTPResponseType")
+HTTPRequestType = TypeVar("HTTPRequestType")
+
+if TYPE_CHECKING:
+    # We need a transport to define a pipeline, this "if" avoid a circular import
+    from .._base_async import AsyncPipeline
+
+
+class _ResponseStopIteration(Exception):
+    pass
+
+
+def _iterate_response_content(iterator):
+    """To avoid the following error from Python:
+    > TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
+
+    :param iterator: An iterator
+    :type iterator: iterator
+    :return: The next item in the iterator
+    :rtype: any
+    """
+    try:
+        return next(iterator)
+    except StopIteration:
+        raise _ResponseStopIteration()  # pylint: disable=raise-missing-from
+
+
+class AsyncHttpResponse(_HttpResponseBase, AsyncContextManager["AsyncHttpResponse"]):
+    """An AsyncHttpResponse ABC.
+
+    Allows for the asynchronous streaming of data from the response.
+    """
+
+    def stream_download(
+        self,
+        pipeline: AsyncPipeline[HttpRequest, "AsyncHttpResponse"],
+        *,
+        decompress: bool = True,
+        **kwargs: Any,
+    ) -> AsyncIteratorType[bytes]:
+        """Generator for streaming response body data.
+
+        Should be implemented by sub-classes if streaming download
+        is supported. Will return an asynchronous generator.
+
+        :param pipeline: The pipeline object
+        :type pipeline: azure.core.pipeline.Pipeline
+        :keyword bool decompress: If True which is default, will attempt to decode the body based
+            on the *content-encoding* header.
+        :return: An async iterator of bytes
+        :rtype: AsyncIterator[bytes]
+        """
+        raise NotImplementedError("stream_download is not implemented.")
+
+    def parts(self) -> AsyncIterator["AsyncHttpResponse"]:
+        """Assuming the content-type is multipart/mixed, will return the parts as an async iterator.
+
+        :return: An async iterator of the parts
+        :rtype: AsyncIterator
+        :raises ValueError: If the content is not multipart/mixed
+        """
+        if not self.content_type or not self.content_type.startswith("multipart/mixed"):
+            raise ValueError("You can't get parts if the response is not multipart/mixed")
+
+        return _PartGenerator(self, default_http_response_type=AsyncHttpClientTransportResponse)
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]] = None,
+        exc_value: Optional[BaseException] = None,
+        traceback: Optional[TracebackType] = None,
+    ) -> None:
+        return None
+
+
+class AsyncHttpClientTransportResponse(_HttpClientTransportResponse, AsyncHttpResponse):
+    """Create a HTTPResponse from an http.client response.
+
+    Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary.
+
+    :param HttpRequest request: The request.
+    :param httpclient_response: The object returned from an HTTP(S)Connection from http.client
+    """
+
+
+class AsyncHttpTransport(
+    AsyncContextManager["AsyncHttpTransport"],
+    abc.ABC,
+    Generic[HTTPRequestType, AsyncHTTPResponseType],
+):
+    """An http sender ABC."""
+
+    @abc.abstractmethod
+    async def send(self, request: HTTPRequestType, **kwargs: Any) -> AsyncHTTPResponseType:
+        """Send the request using this HTTP sender.
+
+        :param request: The request object. Exact type can be inferred from the pipeline.
+        :type request: any
+        :return: The response object. Exact type can be inferred from the pipeline.
+        :rtype: any
+        """
+
+    @abc.abstractmethod
+    async def open(self) -> None:
+        """Assign new session if one does not already exist."""
+
+    @abc.abstractmethod
+    async def close(self) -> None:
+        """Close the session if it is not externally owned."""
+
+    async def sleep(self, duration: float) -> None:
+        """Sleep for the specified duration.
+
+        You should always ask the transport to sleep, and not call directly
+        the stdlib. This is mostly important in async, as the transport
+        may not use asyncio but other implementation like trio and they their own
+        way to sleep, but to keep design
+        consistent, it's cleaner to always ask the transport to sleep and let the transport
+        implementor decide how to do it.
+        By default, this method will use "asyncio", and don't need to be overridden
+        if your transport does too.
+
+        :param float duration: The number of seconds to sleep.
+        """
+        await asyncio.sleep(duration)
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py
new file mode 100644
index 00000000..15ec81a8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py
@@ -0,0 +1,55 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from typing import Optional, Type
+from types import TracebackType
+from ._requests_basic import RequestsTransport
+from ._base_async import AsyncHttpTransport
+
+
+class RequestsAsyncTransportBase(RequestsTransport, AsyncHttpTransport):  # type: ignore
+    async def _retrieve_request_data(self, request):
+        if hasattr(request.data, "__aiter__"):
+            # Need to consume that async generator, since requests can't do anything with it
+            # That's not ideal, but a list is our only choice. Memory not optimal here,
+            # but providing an async generator to a requests based transport is not optimal too
+            new_data = []
+            async for part in request.data:
+                new_data.append(part)
+            data_to_send = iter(new_data)
+        else:
+            data_to_send = request.data
+        return data_to_send
+
+    async def __aenter__(self):
+        return super(RequestsAsyncTransportBase, self).__enter__()
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]] = None,
+        exc_value: Optional[BaseException] = None,
+        traceback: Optional[TracebackType] = None,
+    ):
+        return super(RequestsAsyncTransportBase, self).__exit__(exc_type, exc_value, traceback)
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py
new file mode 100644
index 00000000..d2096773
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py
@@ -0,0 +1,48 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+
+import sys
+from requests.adapters import HTTPAdapter
+
+
+class BiggerBlockSizeHTTPAdapter(HTTPAdapter):
+    def get_connection(self, url, proxies=None):
+        """Returns a urllib3 connection for the given URL. This should not be
+        called from user code, and is only exposed for use when subclassing the
+        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
+
+        :param str url: The URL to connect to.
+        :param MutableMapping proxies: (optional) A Requests-style dictionary of proxies used on this request.
+        :rtype: urllib3.ConnectionPool
+        :returns: The urllib3 ConnectionPool for the given URL.
+        """
+        conn = super(BiggerBlockSizeHTTPAdapter, self).get_connection(url, proxies)
+        system_version = tuple(sys.version_info)[:3]
+        if system_version[:2] >= (3, 7):
+            if not conn.conn_kw:
+                conn.conn_kw = {}
+            conn.conn_kw["blocksize"] = 32768
+        return conn
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py
new file mode 100644
index 00000000..f2136515
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py
@@ -0,0 +1,296 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import asyncio
+from collections.abc import AsyncIterator
+import functools
+import logging
+from typing import (
+    Any,
+    Optional,
+    AsyncIterator as AsyncIteratorType,
+    Union,
+    TYPE_CHECKING,
+    overload,
+    Type,
+    MutableMapping,
+)
+from types import TracebackType
+from urllib3.exceptions import (
+    ProtocolError,
+    NewConnectionError,
+    ConnectTimeoutError,
+)
+import requests
+
+from azure.core.exceptions import (
+    ServiceRequestError,
+    ServiceResponseError,
+    IncompleteReadError,
+    HttpResponseError,
+)
+from azure.core.pipeline import Pipeline
+from ._base import HttpRequest
+from ._base_async import (
+    AsyncHttpResponse,
+    _ResponseStopIteration,
+    _iterate_response_content,
+)
+from ._requests_basic import (
+    RequestsTransportResponse,
+    _read_raw_stream,
+    AzureErrorUnion,
+)
+from ._base_requests_async import RequestsAsyncTransportBase
+from .._tools import is_rest as _is_rest
+from .._tools_async import (
+    handle_no_stream_rest_response as _handle_no_stream_rest_response,
+)
+
+if TYPE_CHECKING:
+    from ...rest import (
+        HttpRequest as RestHttpRequest,
+        AsyncHttpResponse as RestAsyncHttpResponse,
+    )
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def _get_running_loop():
+    return asyncio.get_running_loop()
+
+
+class AsyncioRequestsTransport(RequestsAsyncTransportBase):
+    """Identical implementation as the synchronous RequestsTransport wrapped in a class with
+    asynchronous methods. Uses the built-in asyncio event loop.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/test_example_async.py
+            :start-after: [START asyncio]
+            :end-before: [END asyncio]
+            :language: python
+            :dedent: 4
+            :caption: Asynchronous transport with asyncio.
+    """
+
+    async def __aenter__(self):
+        return super(AsyncioRequestsTransport, self).__enter__()
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]] = None,
+        exc_value: Optional[BaseException] = None,
+        traceback: Optional[TracebackType] = None,
+    ) -> None:
+        return super(AsyncioRequestsTransport, self).__exit__(exc_type, exc_value, traceback)
+
+    async def sleep(self, duration):  # pylint:disable=invalid-overridden-method
+        await asyncio.sleep(duration)
+
+    @overload  # type: ignore
+    async def send(  # pylint:disable=invalid-overridden-method
+        self, request: HttpRequest, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any
+    ) -> AsyncHttpResponse:
+        """Send the request using this HTTP sender.
+
+        :param request: The HttpRequest
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+
+    @overload
+    async def send(  # pylint:disable=invalid-overridden-method
+        self, request: "RestHttpRequest", *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any
+    ) -> "RestAsyncHttpResponse":
+        """Send a `azure.core.rest` request using this HTTP sender.
+
+        :param request: The HttpRequest
+        :type request: ~azure.core.rest.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.rest.AsyncHttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+
+    async def send(  # pylint:disable=invalid-overridden-method
+        self,
+        request: Union[HttpRequest, "RestHttpRequest"],
+        *,
+        proxies: Optional[MutableMapping[str, str]] = None,
+        **kwargs
+    ) -> Union[AsyncHttpResponse, "RestAsyncHttpResponse"]:
+        """Send the request using this HTTP sender.
+
+        :param request: The HttpRequest
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+        self.open()
+        loop = kwargs.get("loop", _get_running_loop())
+        response = None
+        error: Optional[AzureErrorUnion] = None
+        data_to_send = await self._retrieve_request_data(request)
+        try:
+            response = await loop.run_in_executor(
+                None,
+                functools.partial(
+                    self.session.request,
+                    request.method,
+                    request.url,
+                    headers=request.headers,
+                    data=data_to_send,
+                    files=request.files,
+                    verify=kwargs.pop("connection_verify", self.connection_config.verify),
+                    timeout=kwargs.pop("connection_timeout", self.connection_config.timeout),
+                    cert=kwargs.pop("connection_cert", self.connection_config.cert),
+                    allow_redirects=False,
+                    proxies=proxies,
+                    **kwargs
+                ),
+            )
+            response.raw.enforce_content_length = True
+
+        except (
+            NewConnectionError,
+            ConnectTimeoutError,
+        ) as err:
+            error = ServiceRequestError(err, error=err)
+        except requests.exceptions.ReadTimeout as err:
+            error = ServiceResponseError(err, error=err)
+        except requests.exceptions.ConnectionError as err:
+            if err.args and isinstance(err.args[0], ProtocolError):
+                error = ServiceResponseError(err, error=err)
+            else:
+                error = ServiceRequestError(err, error=err)
+        except requests.exceptions.ChunkedEncodingError as err:
+            msg = err.__str__()
+            if "IncompleteRead" in msg:
+                _LOGGER.warning("Incomplete download: %s", err)
+                error = IncompleteReadError(err, error=err)
+            else:
+                _LOGGER.warning("Unable to stream download: %s", err)
+                error = HttpResponseError(err, error=err)
+        except requests.RequestException as err:
+            error = ServiceRequestError(err, error=err)
+
+        if error:
+            raise error
+        if _is_rest(request):
+            from azure.core.rest._requests_asyncio import (
+                RestAsyncioRequestsTransportResponse,
+            )
+
+            retval = RestAsyncioRequestsTransportResponse(
+                request=request,
+                internal_response=response,
+                block_size=self.connection_config.data_block_size,
+            )
+            if not kwargs.get("stream"):
+                await _handle_no_stream_rest_response(retval)
+            return retval
+
+        return AsyncioRequestsTransportResponse(request, response, self.connection_config.data_block_size)
+
+
+class AsyncioStreamDownloadGenerator(AsyncIterator):
+    """Streams the response body data.
+
+    :param pipeline: The pipeline object
+    :type pipeline: ~azure.core.pipeline.AsyncPipeline
+    :param response: The response object.
+    :type response: ~azure.core.pipeline.transport.AsyncHttpResponse
+    :keyword bool decompress: If True which is default, will attempt to decode the body based
+            on the *content-encoding* header.
+    """
+
+    def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse, **kwargs) -> None:
+        self.pipeline = pipeline
+        self.request = response.request
+        self.response = response
+        self.block_size = response.block_size
+        decompress = kwargs.pop("decompress", True)
+        if len(kwargs) > 0:
+            raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0]))
+        internal_response = response.internal_response
+        if decompress:
+            self.iter_content_func = internal_response.iter_content(self.block_size)
+        else:
+            self.iter_content_func = _read_raw_stream(internal_response, self.block_size)
+        self.content_length = int(response.headers.get("Content-Length", 0))
+
+    def __len__(self):
+        return self.content_length
+
+    async def __anext__(self):
+        loop = _get_running_loop()
+        internal_response = self.response.internal_response
+        try:
+            chunk = await loop.run_in_executor(
+                None,
+                _iterate_response_content,
+                self.iter_content_func,
+            )
+            if not chunk:
+                raise _ResponseStopIteration()
+            return chunk
+        except _ResponseStopIteration:
+            internal_response.close()
+            raise StopAsyncIteration()  # pylint: disable=raise-missing-from
+        except requests.exceptions.StreamConsumedError:
+            raise
+        except requests.exceptions.ChunkedEncodingError as err:
+            msg = err.__str__()
+            if "IncompleteRead" in msg:
+                _LOGGER.warning("Incomplete download: %s", err)
+                internal_response.close()
+                raise IncompleteReadError(err, error=err) from err
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise HttpResponseError(err, error=err) from err
+        except Exception as err:
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise
+
+
+class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse):  # type: ignore
+    """Asynchronous streaming of data from the response."""
+
+    def stream_download(self, pipeline, **kwargs) -> AsyncIteratorType[bytes]:  # type: ignore
+        """Generator for streaming request body data.
+
+        :param pipeline: The pipeline object
+        :type pipeline: ~azure.core.pipeline.AsyncPipeline
+        :rtype: AsyncIterator[bytes]
+        :return: An async iterator of bytes chunks
+        """
+        return AsyncioStreamDownloadGenerator(pipeline, self, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py
new file mode 100644
index 00000000..7cfe556f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py
@@ -0,0 +1,421 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import logging
+from typing import (
+    Iterator,
+    Optional,
+    Union,
+    TypeVar,
+    overload,
+    TYPE_CHECKING,
+    MutableMapping,
+)
+from urllib3.util.retry import Retry
+from urllib3.exceptions import (
+    DecodeError as CoreDecodeError,
+    ReadTimeoutError,
+    ProtocolError,
+    NewConnectionError,
+    ConnectTimeoutError,
+)
+import requests
+
+from azure.core.configuration import ConnectionConfiguration
+from azure.core.exceptions import (
+    ServiceRequestError,
+    ServiceResponseError,
+    IncompleteReadError,
+    HttpResponseError,
+    DecodeError,
+)
+from . import HttpRequest
+
+from ._base import HttpTransport, HttpResponse, _HttpResponseBase
+from ._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter
+from .._tools import (
+    is_rest as _is_rest,
+    handle_non_stream_rest_response as _handle_non_stream_rest_response,
+)
+
+if TYPE_CHECKING:
+    from ...rest import HttpRequest as RestHttpRequest, HttpResponse as RestHttpResponse
+
+AzureErrorUnion = Union[
+    ServiceRequestError,
+    ServiceResponseError,
+    IncompleteReadError,
+    HttpResponseError,
+]
+
+PipelineType = TypeVar("PipelineType")
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def _read_raw_stream(response, chunk_size=1):
+    # Special case for urllib3.
+    if hasattr(response.raw, "stream"):
+        try:
+            yield from response.raw.stream(chunk_size, decode_content=False)
+        except ProtocolError as e:
+            raise ServiceResponseError(e, error=e) from e
+        except CoreDecodeError as e:
+            raise DecodeError(e, error=e) from e
+        except ReadTimeoutError as e:
+            raise ServiceRequestError(e, error=e) from e
+    else:
+        # Standard file-like object.
+        while True:
+            chunk = response.raw.read(chunk_size)
+            if not chunk:
+                break
+            yield chunk
+
+    # following behavior from requests iter_content, we set content consumed to True
+    # https://github.com/psf/requests/blob/master/requests/models.py#L774
+    response._content_consumed = True  # pylint: disable=protected-access
+
+
+class _RequestsTransportResponseBase(_HttpResponseBase):
+    """Base class for accessing response data.
+
+    :param HttpRequest request: The request.
+    :param requests_response: The object returned from the HTTP library.
+    :type requests_response: requests.Response
+    :param int block_size: Size in bytes.
+    """
+
+    def __init__(self, request, requests_response, block_size=None):
+        super(_RequestsTransportResponseBase, self).__init__(request, requests_response, block_size=block_size)
+        self.status_code = requests_response.status_code
+        self.headers = requests_response.headers
+        self.reason = requests_response.reason
+        self.content_type = requests_response.headers.get("content-type")
+
+    def body(self):
+        return self.internal_response.content
+
+    def text(self, encoding: Optional[str] = None) -> str:
+        """Return the whole body as a string.
+
+        If encoding is not provided, mostly rely on requests auto-detection, except
+        for BOM, that requests ignores. If we see a UTF8 BOM, we assumes UTF8 unlike requests.
+
+        :param str encoding: The encoding to apply.
+        :rtype: str
+        :return: The body as text.
+        """
+        if not encoding:
+            # There is a few situation where "requests" magic doesn't fit us:
+            # - https://github.com/psf/requests/issues/654
+            # - https://github.com/psf/requests/issues/1737
+            # - https://github.com/psf/requests/issues/2086
+            from codecs import BOM_UTF8
+
+            if self.internal_response.content[:3] == BOM_UTF8:
+                encoding = "utf-8-sig"
+
+        if encoding:
+            if encoding == "utf-8":
+                encoding = "utf-8-sig"
+
+            self.internal_response.encoding = encoding
+
+        return self.internal_response.text
+
+
+class StreamDownloadGenerator:
+    """Generator for streaming response data.
+
+    :param pipeline: The pipeline object
+    :type pipeline: ~azure.core.pipeline.Pipeline
+    :param response: The response object.
+    :type response: ~azure.core.pipeline.transport.HttpResponse
+    :keyword bool decompress: If True which is default, will attempt to decode the body based
+        on the *content-encoding* header.
+    """
+
+    def __init__(self, pipeline, response, **kwargs):
+        self.pipeline = pipeline
+        self.request = response.request
+        self.response = response
+        self.block_size = response.block_size
+        decompress = kwargs.pop("decompress", True)
+        if len(kwargs) > 0:
+            raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0]))
+        internal_response = response.internal_response
+        if decompress:
+            self.iter_content_func = internal_response.iter_content(self.block_size)
+        else:
+            self.iter_content_func = _read_raw_stream(internal_response, self.block_size)
+        self.content_length = int(response.headers.get("Content-Length", 0))
+
+    def __len__(self):
+        return self.content_length
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        internal_response = self.response.internal_response
+        try:
+            chunk = next(self.iter_content_func)
+            if not chunk:
+                raise StopIteration()
+            return chunk
+        except StopIteration:
+            internal_response.close()
+            raise StopIteration()  # pylint: disable=raise-missing-from
+        except requests.exceptions.StreamConsumedError:
+            raise
+        except requests.exceptions.ContentDecodingError as err:
+            raise DecodeError(err, error=err) from err
+        except requests.exceptions.ChunkedEncodingError as err:
+            msg = err.__str__()
+            if "IncompleteRead" in msg:
+                _LOGGER.warning("Incomplete download: %s", err)
+                internal_response.close()
+                raise IncompleteReadError(err, error=err) from err
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise HttpResponseError(err, error=err) from err
+        except Exception as err:
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise
+
+    next = __next__  # Python 2 compatibility.
+
+
+class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase):
+    """Streaming of data from the response."""
+
+    def stream_download(self, pipeline: PipelineType, **kwargs) -> Iterator[bytes]:
+        """Generator for streaming request body data.
+
+        :param pipeline: The pipeline object
+        :type pipeline: ~azure.core.pipeline.Pipeline
+        :rtype: iterator[bytes]
+        :return: The stream of data
+        """
+        return StreamDownloadGenerator(pipeline, self, **kwargs)
+
+
+class RequestsTransport(HttpTransport):
+    """Implements a basic requests HTTP sender.
+
+    Since requests team recommends to use one session per requests, you should
+    not consider this class as thread-safe, since it will use one Session
+    per instance.
+
+    In this simple implementation:
+    - You provide the configured session if you want to, or a basic session is created.
+    - All kwargs received by "send" are sent to session.request directly
+
+    :keyword requests.Session session: Request session to use instead of the default one.
+    :keyword bool session_owner: Decide if the session provided by user is owned by this transport. Default to True.
+    :keyword bool use_env_settings: Uses proxy settings from environment. Defaults to True.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/test_example_sync.py
+            :start-after: [START requests]
+            :end-before: [END requests]
+            :language: python
+            :dedent: 4
+            :caption: Synchronous transport with Requests.
+    """
+
+    _protocols = ["http://", "https://"]
+
+    def __init__(self, **kwargs) -> None:
+        self.session = kwargs.get("session", None)
+        self._session_owner = kwargs.get("session_owner", True)
+        if not self._session_owner and not self.session:
+            raise ValueError("session_owner cannot be False if no session is provided")
+        self.connection_config = ConnectionConfiguration(**kwargs)
+        self._use_env_settings = kwargs.pop("use_env_settings", True)
+        # See https://github.com/Azure/azure-sdk-for-python/issues/25640 to understand why we track this
+        self._has_been_opened = False
+
+    def __enter__(self) -> "RequestsTransport":
+        self.open()
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def _init_session(self, session: requests.Session) -> None:
+        """Init session level configuration of requests.
+
+        This is initialization I want to do once only on a session.
+
+        :param requests.Session session: The session object.
+        """
+        session.trust_env = self._use_env_settings
+        disable_retries = Retry(total=False, redirect=False, raise_on_status=False)
+        adapter = BiggerBlockSizeHTTPAdapter(max_retries=disable_retries)
+        for p in self._protocols:
+            session.mount(p, adapter)
+
+    def open(self):
+        if self._has_been_opened and not self.session:
+            raise ValueError(
+                "HTTP transport has already been closed. "
+                "You may check if you're calling a function outside of the `with` of your client creation, "
+                "or if you called `close()` on your client already."
+            )
+        if not self.session:
+            if self._session_owner:
+                self.session = requests.Session()
+                self._init_session(self.session)
+            else:
+                raise ValueError("session_owner cannot be False and no session is available")
+        self._has_been_opened = True
+
+    def close(self):
+        if self._session_owner and self.session:
+            self.session.close()
+            self.session = None
+
+    @overload
+    def send(
+        self, request: HttpRequest, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs
+    ) -> HttpResponse:
+        """Send a rest request and get back a rest response.
+
+        :param request: The request object to be sent.
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: An HTTPResponse object.
+        :rtype: ~azure.core.pipeline.transport.HttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+
+    @overload
+    def send(
+        self, request: "RestHttpRequest", *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs
+    ) -> "RestHttpResponse":
+        """Send an `azure.core.rest` request and get back a rest response.
+
+        :param request: The request object to be sent.
+        :type request: ~azure.core.rest.HttpRequest
+        :return: An HTTPResponse object.
+        :rtype: ~azure.core.rest.HttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+
+    def send(  # pylint: disable=too-many-statements
+        self,
+        request: Union[HttpRequest, "RestHttpRequest"],
+        *,
+        proxies: Optional[MutableMapping[str, str]] = None,
+        **kwargs
+    ) -> Union[HttpResponse, "RestHttpResponse"]:
+        """Send request object according to configuration.
+
+        :param request: The request object to be sent.
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: An HTTPResponse object.
+        :rtype: ~azure.core.pipeline.transport.HttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+        self.open()
+        response = None
+        error: Optional[AzureErrorUnion] = None
+
+        try:
+            connection_timeout = kwargs.pop("connection_timeout", self.connection_config.timeout)
+
+            if isinstance(connection_timeout, tuple):
+                if "read_timeout" in kwargs:
+                    raise ValueError("Cannot set tuple connection_timeout and read_timeout together")
+                _LOGGER.warning("Tuple timeout setting is deprecated")
+                timeout = connection_timeout
+            else:
+                read_timeout = kwargs.pop("read_timeout", self.connection_config.read_timeout)
+                timeout = (connection_timeout, read_timeout)
+            response = self.session.request(  # type: ignore
+                request.method,
+                request.url,
+                headers=request.headers,
+                data=request.data,
+                files=request.files,
+                verify=kwargs.pop("connection_verify", self.connection_config.verify),
+                timeout=timeout,
+                cert=kwargs.pop("connection_cert", self.connection_config.cert),
+                allow_redirects=False,
+                proxies=proxies,
+                **kwargs
+            )
+            response.raw.enforce_content_length = True
+
+        except AttributeError as err:
+            if self.session is None:
+                raise ValueError(
+                    "No session available for request. "
+                    "Please report this issue to https://github.com/Azure/azure-sdk-for-python/issues."
+                ) from err
+            raise
+        except (
+            NewConnectionError,
+            ConnectTimeoutError,
+        ) as err:
+            error = ServiceRequestError(err, error=err)
+        except requests.exceptions.ReadTimeout as err:
+            error = ServiceResponseError(err, error=err)
+        except requests.exceptions.ConnectionError as err:
+            if err.args and isinstance(err.args[0], ProtocolError):
+                error = ServiceResponseError(err, error=err)
+            else:
+                error = ServiceRequestError(err, error=err)
+        except requests.exceptions.ChunkedEncodingError as err:
+            msg = err.__str__()
+            if "IncompleteRead" in msg:
+                _LOGGER.warning("Incomplete download: %s", err)
+                error = IncompleteReadError(err, error=err)
+            else:
+                _LOGGER.warning("Unable to stream download: %s", err)
+                error = HttpResponseError(err, error=err)
+        except requests.RequestException as err:
+            error = ServiceRequestError(err, error=err)
+
+        if error:
+            raise error
+        if _is_rest(request):
+            from azure.core.rest._requests_basic import RestRequestsTransportResponse
+
+            retval: RestHttpResponse = RestRequestsTransportResponse(
+                request=request,
+                internal_response=response,
+                block_size=self.connection_config.data_block_size,
+            )
+            if not kwargs.get("stream"):
+                _handle_non_stream_rest_response(retval)
+            return retval
+        return RequestsTransportResponse(request, response, self.connection_config.data_block_size)
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py
new file mode 100644
index 00000000..36d56890
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py
@@ -0,0 +1,311 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from collections.abc import AsyncIterator
+import functools
+import logging
+from typing import (
+    Any,
+    Optional,
+    AsyncIterator as AsyncIteratorType,
+    TYPE_CHECKING,
+    overload,
+    Type,
+    MutableMapping,
+)
+from types import TracebackType
+from urllib3.exceptions import (
+    ProtocolError,
+    NewConnectionError,
+    ConnectTimeoutError,
+)
+
+import trio
+
+import requests
+
+from azure.core.exceptions import (
+    ServiceRequestError,
+    ServiceResponseError,
+    IncompleteReadError,
+    HttpResponseError,
+)
+from azure.core.pipeline import Pipeline
+from ._base import HttpRequest
+from ._base_async import (
+    AsyncHttpResponse,
+    _ResponseStopIteration,
+    _iterate_response_content,
+)
+from ._requests_basic import (
+    RequestsTransportResponse,
+    _read_raw_stream,
+    AzureErrorUnion,
+)
+from ._base_requests_async import RequestsAsyncTransportBase
+from .._tools import is_rest as _is_rest
+from .._tools_async import (
+    handle_no_stream_rest_response as _handle_no_stream_rest_response,
+)
+
+if TYPE_CHECKING:
+    from ...rest import (
+        HttpRequest as RestHttpRequest,
+        AsyncHttpResponse as RestAsyncHttpResponse,
+    )
+
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class TrioStreamDownloadGenerator(AsyncIterator):
+    """Generator for streaming response data.
+
+    :param pipeline: The pipeline object
+    :type pipeline: ~azure.core.pipeline.AsyncPipeline
+    :param response: The response object.
+    :type response: ~azure.core.pipeline.transport.AsyncHttpResponse
+    :keyword bool decompress: If True which is default, will attempt to decode the body based
+        on the *content-encoding* header.
+    """
+
+    def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse, **kwargs) -> None:
+        self.pipeline = pipeline
+        self.request = response.request
+        self.response = response
+        self.block_size = response.block_size
+        decompress = kwargs.pop("decompress", True)
+        if len(kwargs) > 0:
+            raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0]))
+        internal_response = response.internal_response
+        if decompress:
+            self.iter_content_func = internal_response.iter_content(self.block_size)
+        else:
+            self.iter_content_func = _read_raw_stream(internal_response, self.block_size)
+        self.content_length = int(response.headers.get("Content-Length", 0))
+
+    def __len__(self):
+        return self.content_length
+
+    async def __anext__(self):
+        internal_response = self.response.internal_response
+        try:
+            try:
+                chunk = await trio.to_thread.run_sync(
+                    _iterate_response_content,
+                    self.iter_content_func,
+                )
+            except AttributeError:  # trio < 0.12.1
+                chunk = await trio.run_sync_in_worker_thread(  # type: ignore  # pylint:disable=no-member
+                    _iterate_response_content,
+                    self.iter_content_func,
+                )
+            if not chunk:
+                raise _ResponseStopIteration()
+            return chunk
+        except _ResponseStopIteration:
+            internal_response.close()
+            raise StopAsyncIteration()  # pylint: disable=raise-missing-from
+        except requests.exceptions.StreamConsumedError:
+            raise
+        except requests.exceptions.ChunkedEncodingError as err:
+            msg = err.__str__()
+            if "IncompleteRead" in msg:
+                _LOGGER.warning("Incomplete download: %s", err)
+                internal_response.close()
+                raise IncompleteReadError(err, error=err) from err
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise HttpResponseError(err, error=err) from err
+        except Exception as err:
+            _LOGGER.warning("Unable to stream download: %s", err)
+            internal_response.close()
+            raise
+
+
+class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse):  # type: ignore
+    """Asynchronous streaming of data from the response."""
+
+    def stream_download(self, pipeline, **kwargs) -> AsyncIteratorType[bytes]:  # type: ignore
+        """Generator for streaming response data.
+
+        :param pipeline: The pipeline object
+        :type pipeline: ~azure.core.pipeline.AsyncPipeline
+        :rtype: AsyncIterator[bytes]
+        :return: An async iterator of bytes chunks
+        """
+        return TrioStreamDownloadGenerator(pipeline, self, **kwargs)
+
+
+class TrioRequestsTransport(RequestsAsyncTransportBase):
+    """Identical implementation as the synchronous RequestsTransport wrapped in a class with
+    asynchronous methods. Uses the third party trio event loop.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/test_example_async.py
+            :start-after: [START trio]
+            :end-before: [END trio]
+            :language: python
+            :dedent: 4
+            :caption: Asynchronous transport with trio.
+    """
+
+    async def __aenter__(self):
+        return super(TrioRequestsTransport, self).__enter__()
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]] = None,
+        exc_value: Optional[BaseException] = None,
+        traceback: Optional[TracebackType] = None,
+    ) -> None:
+        return super(TrioRequestsTransport, self).__exit__(exc_type, exc_value, traceback)
+
+    async def sleep(self, duration):  # pylint:disable=invalid-overridden-method
+        await trio.sleep(duration)
+
+    @overload  # type: ignore
+    async def send(  # pylint:disable=invalid-overridden-method
+        self, request: HttpRequest, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any
+    ) -> AsyncHttpResponse:
+        """Send the request using this HTTP sender.
+
+        :param request: The HttpRequest
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+
+    @overload
+    async def send(  # pylint:disable=invalid-overridden-method
+        self, request: "RestHttpRequest", *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any
+    ) -> "RestAsyncHttpResponse":
+        """Send an `azure.core.rest` request using this HTTP sender.
+
+        :param request: The HttpRequest
+        :type request: ~azure.core.rest.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.rest.AsyncHttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+
+    async def send(
+        self, request, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any
+    ):  # pylint:disable=invalid-overridden-method
+        """Send the request using this HTTP sender.
+
+        :param request: The HttpRequest
+        :type request: ~azure.core.pipeline.transport.HttpRequest
+        :return: The AsyncHttpResponse
+        :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
+
+        :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url)
+        """
+        self.open()
+        trio_limiter = kwargs.get("trio_limiter", None)
+        response = None
+        error: Optional[AzureErrorUnion] = None
+        data_to_send = await self._retrieve_request_data(request)
+        try:
+            try:
+                response = await trio.to_thread.run_sync(
+                    functools.partial(
+                        self.session.request,
+                        request.method,
+                        request.url,
+                        headers=request.headers,
+                        data=data_to_send,
+                        files=request.files,
+                        verify=kwargs.pop("connection_verify", self.connection_config.verify),
+                        timeout=kwargs.pop("connection_timeout", self.connection_config.timeout),
+                        cert=kwargs.pop("connection_cert", self.connection_config.cert),
+                        allow_redirects=False,
+                        proxies=proxies,
+                        **kwargs
+                    ),
+                    limiter=trio_limiter,
+                )
+            except AttributeError:  # trio < 0.12.1
+                response = await trio.run_sync_in_worker_thread(  # type: ignore # pylint:disable=no-member
+                    functools.partial(
+                        self.session.request,
+                        request.method,
+                        request.url,
+                        headers=request.headers,
+                        data=request.data,
+                        files=request.files,
+                        verify=kwargs.pop("connection_verify", self.connection_config.verify),
+                        timeout=kwargs.pop("connection_timeout", self.connection_config.timeout),
+                        cert=kwargs.pop("connection_cert", self.connection_config.cert),
+                        allow_redirects=False,
+                        proxies=proxies,
+                        **kwargs
+                    ),
+                    limiter=trio_limiter,
+                )
+            response.raw.enforce_content_length = True
+
+        except (
+            NewConnectionError,
+            ConnectTimeoutError,
+        ) as err:
+            error = ServiceRequestError(err, error=err)
+        except requests.exceptions.ReadTimeout as err:
+            error = ServiceResponseError(err, error=err)
+        except requests.exceptions.ConnectionError as err:
+            if err.args and isinstance(err.args[0], ProtocolError):
+                error = ServiceResponseError(err, error=err)
+            else:
+                error = ServiceRequestError(err, error=err)
+        except requests.exceptions.ChunkedEncodingError as err:
+            msg = err.__str__()
+            if "IncompleteRead" in msg:
+                _LOGGER.warning("Incomplete download: %s", err)
+                error = IncompleteReadError(err, error=err)
+            else:
+                _LOGGER.warning("Unable to stream download: %s", err)
+                error = HttpResponseError(err, error=err)
+        except requests.RequestException as err:
+            error = ServiceRequestError(err, error=err)
+
+        if error:
+            raise error
+        if _is_rest(request):
+            from azure.core.rest._requests_trio import RestTrioRequestsTransportResponse
+
+            retval = RestTrioRequestsTransportResponse(
+                request=request,
+                internal_response=response,
+                block_size=self.connection_config.data_block_size,
+            )
+            if not kwargs.get("stream"):
+                await _handle_no_stream_rest_response(retval)
+            return retval
+
+        return TrioRequestsTransportResponse(request, response, self.connection_config.data_block_size)