diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/pipeline/transport')
9 files changed, 2856 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py new file mode 100644 index 00000000..f60e260a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/__init__.py @@ -0,0 +1,120 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import List, Optional, Any +from ._base import HttpTransport, HttpRequest, HttpResponse +from ._base_async import AsyncHttpTransport, AsyncHttpResponse + +# pylint: disable=undefined-all-variable + +__all__ = [ + "HttpTransport", + "HttpRequest", + "HttpResponse", + "RequestsTransport", + "RequestsTransportResponse", + "AsyncHttpTransport", + "AsyncHttpResponse", + "AsyncioRequestsTransport", + "AsyncioRequestsTransportResponse", + "TrioRequestsTransport", + "TrioRequestsTransportResponse", + "AioHttpTransport", + "AioHttpTransportResponse", +] + +# pylint: disable= no-member, too-many-statements + + +def __dir__() -> List[str]: + return __all__ + + +# To do nice overloads, need https://github.com/python/mypy/issues/8203 + + +def __getattr__(name: str): + transport: Optional[Any] = None + if name == "AsyncioRequestsTransport": + try: + from ._requests_asyncio import AsyncioRequestsTransport + + transport = AsyncioRequestsTransport + except ImportError as err: + raise ImportError("requests package is not installed") from err + if name == "AsyncioRequestsTransportResponse": + try: + from ._requests_asyncio import AsyncioRequestsTransportResponse + + transport = AsyncioRequestsTransportResponse + except ImportError as err: + raise ImportError("requests package is not installed") from err + if name == "RequestsTransport": + try: + from ._requests_basic import RequestsTransport + + transport = RequestsTransport + except ImportError as err: + raise ImportError("requests package is not installed") from err + if name == "RequestsTransportResponse": + try: + from ._requests_basic import RequestsTransportResponse + + transport = RequestsTransportResponse + except ImportError as err: + raise ImportError("requests package is not installed") from err + if name == "AioHttpTransport": + try: + from ._aiohttp import AioHttpTransport + + transport = AioHttpTransport + except ImportError as err: + raise ImportError("aiohttp package is not installed") from err + if name == "AioHttpTransportResponse": + try: + from ._aiohttp import AioHttpTransportResponse + + transport = AioHttpTransportResponse + except ImportError as err: + raise ImportError("aiohttp package is not installed") from err + if name == "TrioRequestsTransport": + try: + from ._requests_trio import TrioRequestsTransport + + transport = TrioRequestsTransport + except ImportError as ex: + if ex.msg.endswith("'requests'"): + raise ImportError("requests package is not installed") from ex + raise ImportError("trio package is not installed") from ex + if name == "TrioRequestsTransportResponse": + try: + from ._requests_trio import TrioRequestsTransportResponse + + transport = TrioRequestsTransportResponse + except ImportError as err: + raise ImportError("trio package is not installed") from err + if transport: + return transport + raise AttributeError(f"module 'azure.core.pipeline.transport' has no attribute {name}") diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py new file mode 100644 index 00000000..32d97ad3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py @@ -0,0 +1,571 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from __future__ import annotations +import sys +from typing import ( + Any, + Optional, + AsyncIterator as AsyncIteratorType, + TYPE_CHECKING, + overload, + cast, + Union, + Type, + MutableMapping, +) +from types import TracebackType +from collections.abc import AsyncIterator + +import logging +import asyncio +import codecs +import aiohttp +import aiohttp.client_exceptions +from multidict import CIMultiDict + +from azure.core.configuration import ConnectionConfiguration +from azure.core.exceptions import ( + ServiceRequestError, + ServiceResponseError, + IncompleteReadError, +) +from azure.core.pipeline import AsyncPipeline + +from ._base import HttpRequest +from ._base_async import AsyncHttpTransport, AsyncHttpResponse, _ResponseStopIteration +from ...utils._pipeline_transport_rest_shared import ( + _aiohttp_body_helper, + get_file_items, +) +from .._tools import is_rest as _is_rest +from .._tools_async import ( + handle_no_stream_rest_response as _handle_no_stream_rest_response, +) + +if TYPE_CHECKING: + from ...rest import ( + HttpRequest as RestHttpRequest, + AsyncHttpResponse as RestAsyncHttpResponse, + ) + from ...rest._aiohttp import RestAioHttpTransportResponse + +# Matching requests, because why not? +CONTENT_CHUNK_SIZE = 10 * 1024 +_LOGGER = logging.getLogger(__name__) + + +class AioHttpTransport(AsyncHttpTransport): + """AioHttp HTTP sender implementation. + + Fully asynchronous implementation using the aiohttp library. + + :keyword session: The client session. + :paramtype session: ~aiohttp.ClientSession + :keyword bool session_owner: Session owner. Defaults True. + + :keyword bool use_env_settings: Uses proxy settings from environment. Defaults to True. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_async.py + :start-after: [START aiohttp] + :end-before: [END aiohttp] + :language: python + :dedent: 4 + :caption: Asynchronous transport with aiohttp. + """ + + def __init__( + self, + *, + session: Optional[aiohttp.ClientSession] = None, + loop=None, + session_owner: bool = True, + **kwargs, + ): + if loop and sys.version_info >= (3, 10): + raise ValueError("Starting with Python 3.10, asyncio doesn’t support loop as a parameter anymore") + self._loop = loop + self._session_owner = session_owner + self.session = session + if not self._session_owner and not self.session: + raise ValueError("session_owner cannot be False if no session is provided") + self.connection_config = ConnectionConfiguration(**kwargs) + self._use_env_settings = kwargs.pop("use_env_settings", True) + # See https://github.com/Azure/azure-sdk-for-python/issues/25640 to understand why we track this + self._has_been_opened = False + + async def __aenter__(self): + await self.open() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + await self.close() + + async def open(self): + if self._has_been_opened and not self.session: + raise ValueError( + "HTTP transport has already been closed. " + "You may check if you're calling a function outside of the `async with` of your client creation, " + "or if you called `await close()` on your client already." + ) + if not self.session: + if self._session_owner: + jar = aiohttp.DummyCookieJar() + clientsession_kwargs = { + "trust_env": self._use_env_settings, + "cookie_jar": jar, + "auto_decompress": False, + } + if self._loop is not None: + clientsession_kwargs["loop"] = self._loop + self.session = aiohttp.ClientSession(**clientsession_kwargs) + else: + raise ValueError("session_owner cannot be False and no session is available") + + self._has_been_opened = True + await self.session.__aenter__() + + async def close(self): + """Closes the connection.""" + if self._session_owner and self.session: + await self.session.close() + self.session = None + + def _build_ssl_config(self, cert, verify): + """Build the SSL configuration. + + :param tuple cert: Cert information + :param bool verify: SSL verification or path to CA file or directory + :rtype: bool or str or ssl.SSLContext + :return: SSL Configuration + """ + ssl_ctx = None + + if cert or verify not in (True, False): + import ssl + + if verify not in (True, False): + ssl_ctx = ssl.create_default_context(cafile=verify) + else: + ssl_ctx = ssl.create_default_context() + if cert: + ssl_ctx.load_cert_chain(*cert) + return ssl_ctx + return verify + + def _get_request_data(self, request): + """Get the request data. + + :param request: The request object + :type request: ~azure.core.pipeline.transport.HttpRequest or ~azure.core.rest.HttpRequest + :rtype: bytes or ~aiohttp.FormData + :return: The request data + """ + if request.files: + form_data = aiohttp.FormData(request.data or {}) + for form_file, data in get_file_items(request.files): + content_type = data[2] if len(data) > 2 else None + try: + form_data.add_field(form_file, data[1], filename=data[0], content_type=content_type) + except IndexError as err: + raise ValueError("Invalid formdata formatting: {}".format(data)) from err + return form_data + return request.data + + @overload + async def send( + self, + request: HttpRequest, + *, + stream: bool = False, + proxies: Optional[MutableMapping[str, str]] = None, + **config: Any, + ) -> AsyncHttpResponse: + """Send the request using this HTTP sender. + + Will pre-load the body into memory to be available with a sync method. + Pass stream=True to avoid this behavior. + + :param request: The HttpRequest object + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse + + :keyword bool stream: Defaults to False. + :keyword MutableMapping proxies: dict of proxy to used based on protocol. Proxy is a dict (protocol, url) + """ + + @overload + async def send( + self, + request: RestHttpRequest, + *, + stream: bool = False, + proxies: Optional[MutableMapping[str, str]] = None, + **config: Any, + ) -> RestAsyncHttpResponse: + """Send the `azure.core.rest` request using this HTTP sender. + + Will pre-load the body into memory to be available with a sync method. + Pass stream=True to avoid this behavior. + + :param request: The HttpRequest object + :type request: ~azure.core.rest.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.rest.AsyncHttpResponse + + :keyword bool stream: Defaults to False. + :keyword MutableMapping proxies: dict of proxy to used based on protocol. Proxy is a dict (protocol, url) + """ + + async def send( + self, + request: Union[HttpRequest, RestHttpRequest], + *, + stream: bool = False, + proxies: Optional[MutableMapping[str, str]] = None, + **config, + ) -> Union[AsyncHttpResponse, RestAsyncHttpResponse]: + """Send the request using this HTTP sender. + + Will pre-load the body into memory to be available with a sync method. + Pass stream=True to avoid this behavior. + + :param request: The HttpRequest object + :type request: ~azure.core.rest.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.rest.AsyncHttpResponse + + :keyword bool stream: Defaults to False. + :keyword MutableMapping proxies: dict of proxy to used based on protocol. Proxy is a dict (protocol, url) + """ + await self.open() + try: + auto_decompress = self.session.auto_decompress # type: ignore + except AttributeError: + # auto_decompress is introduced in aiohttp 3.7. We need this to handle aiohttp 3.6-. + auto_decompress = False + + proxy = config.pop("proxy", None) + if proxies and not proxy: + # aiohttp needs a single proxy, so iterating until we found the right protocol + + # Sort by longest string first, so "http" is not used for "https" ;-) + for protocol in sorted(proxies.keys(), reverse=True): + if request.url.startswith(protocol): + proxy = proxies[protocol] + break + + response: Optional[Union[AsyncHttpResponse, RestAsyncHttpResponse]] = None + ssl = self._build_ssl_config( + cert=config.pop("connection_cert", self.connection_config.cert), + verify=config.pop("connection_verify", self.connection_config.verify), + ) + # If ssl=True, we just use default ssl context from aiohttp + if ssl is not True: + config["ssl"] = ssl + # If we know for sure there is not body, disable "auto content type" + # Otherwise, aiohttp will send "application/octet-stream" even for empty POST request + # and that break services like storage signature + if not request.data and not request.files: + config["skip_auto_headers"] = ["Content-Type"] + try: + stream_response = stream + timeout = config.pop("connection_timeout", self.connection_config.timeout) + read_timeout = config.pop("read_timeout", self.connection_config.read_timeout) + socket_timeout = aiohttp.ClientTimeout(sock_connect=timeout, sock_read=read_timeout) + result = await self.session.request( # type: ignore + request.method, + request.url, + headers=request.headers, + data=self._get_request_data(request), + timeout=socket_timeout, + allow_redirects=False, + proxy=proxy, + **config, + ) + if _is_rest(request): + from azure.core.rest._aiohttp import RestAioHttpTransportResponse + + response = RestAioHttpTransportResponse( + request=request, + internal_response=result, + block_size=self.connection_config.data_block_size, + decompress=not auto_decompress, + ) + if not stream_response: + await _handle_no_stream_rest_response(response) + else: + # Given the associated "if", this else is legacy implementation + # but mypy do not know it, so using a cast + request = cast(HttpRequest, request) + response = AioHttpTransportResponse( + request, + result, + self.connection_config.data_block_size, + decompress=not auto_decompress, + ) + if not stream_response: + await response.load_body() + except AttributeError as err: + if self.session is None: + raise ValueError( + "No session available for request. " + "Please report this issue to https://github.com/Azure/azure-sdk-for-python/issues." + ) from err + raise + except aiohttp.client_exceptions.ClientResponseError as err: + raise ServiceResponseError(err, error=err) from err + except asyncio.TimeoutError as err: + raise ServiceResponseError(err, error=err) from err + except aiohttp.client_exceptions.ClientError as err: + raise ServiceRequestError(err, error=err) from err + return response + + +class AioHttpStreamDownloadGenerator(AsyncIterator): + """Streams the response body data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.AsyncPipeline + :param response: The client response object. + :type response: ~azure.core.rest.AsyncHttpResponse + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + """ + + @overload + def __init__( + self, + pipeline: AsyncPipeline[HttpRequest, AsyncHttpResponse], + response: AioHttpTransportResponse, + *, + decompress: bool = True, + ) -> None: ... + + @overload + def __init__( + self, + pipeline: AsyncPipeline[RestHttpRequest, RestAsyncHttpResponse], + response: RestAioHttpTransportResponse, + *, + decompress: bool = True, + ) -> None: ... + + def __init__( + self, + pipeline: AsyncPipeline, + response: Union[AioHttpTransportResponse, RestAioHttpTransportResponse], + *, + decompress: bool = True, + ) -> None: + self.pipeline = pipeline + self.request = response.request + self.response = response + self.block_size = response.block_size + self._decompress = decompress + internal_response = response.internal_response + self.content_length = int(internal_response.headers.get("Content-Length", 0)) + self._decompressor = None + + def __len__(self): + return self.content_length + + async def __anext__(self): + internal_response = self.response.internal_response + try: + chunk = await internal_response.content.read(self.block_size) + if not chunk: + raise _ResponseStopIteration() + if not self._decompress: + return chunk + enc = internal_response.headers.get("Content-Encoding") + if not enc: + return chunk + enc = enc.lower() + if enc in ("gzip", "deflate"): + if not self._decompressor: + import zlib + + zlib_mode = (16 + zlib.MAX_WBITS) if enc == "gzip" else -zlib.MAX_WBITS + self._decompressor = zlib.decompressobj(wbits=zlib_mode) + chunk = self._decompressor.decompress(chunk) + return chunk + except _ResponseStopIteration: + internal_response.close() + raise StopAsyncIteration() # pylint: disable=raise-missing-from + except aiohttp.client_exceptions.ClientPayloadError as err: + # This is the case that server closes connection before we finish the reading. aiohttp library + # raises ClientPayloadError. + _LOGGER.warning("Incomplete download: %s", err) + internal_response.close() + raise IncompleteReadError(err, error=err) from err + except aiohttp.client_exceptions.ClientResponseError as err: + raise ServiceResponseError(err, error=err) from err + except asyncio.TimeoutError as err: + raise ServiceResponseError(err, error=err) from err + except aiohttp.client_exceptions.ClientError as err: + raise ServiceRequestError(err, error=err) from err + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise + + +class AioHttpTransportResponse(AsyncHttpResponse): + """Methods for accessing response body data. + + :param request: The HttpRequest object + :type request: ~azure.core.pipeline.transport.HttpRequest + :param aiohttp_response: Returned from ClientSession.request(). + :type aiohttp_response: aiohttp.ClientResponse object + :param block_size: block size of data sent over connection. + :type block_size: int + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + """ + + def __init__( + self, + request: HttpRequest, + aiohttp_response: aiohttp.ClientResponse, + block_size: Optional[int] = None, + *, + decompress: bool = True, + ) -> None: + super(AioHttpTransportResponse, self).__init__(request, aiohttp_response, block_size=block_size) + # https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse + self.status_code = aiohttp_response.status + self.headers = CIMultiDict(aiohttp_response.headers) + self.reason = aiohttp_response.reason + self.content_type = aiohttp_response.headers.get("content-type") + self._content = None + self._decompressed_content = False + self._decompress = decompress + + def body(self) -> bytes: + """Return the whole body as bytes in memory. + + :rtype: bytes + :return: The whole response body. + """ + return _aiohttp_body_helper(self) + + def text(self, encoding: Optional[str] = None) -> str: + """Return the whole body as a string. + + If encoding is not provided, rely on aiohttp auto-detection. + + :param str encoding: The encoding to apply. + :rtype: str + :return: The whole response body as a string. + """ + # super().text detects charset based on self._content() which is compressed + # implement the decoding explicitly here + body = self.body() + + ctype = self.headers.get(aiohttp.hdrs.CONTENT_TYPE, "").lower() + mimetype = aiohttp.helpers.parse_mimetype(ctype) + + if not encoding: + # extract encoding from mimetype, if caller does not specify + encoding = mimetype.parameters.get("charset") + if encoding: + try: + codecs.lookup(encoding) + except LookupError: + encoding = None + if not encoding: + if mimetype.type == "application" and mimetype.subtype in ["json", "rdap"]: + # RFC 7159 states that the default encoding is UTF-8. + # RFC 7483 defines application/rdap+json + encoding = "utf-8" + elif body is None: + raise RuntimeError("Cannot guess the encoding of a not yet read body") + else: + try: + import cchardet as chardet + except ImportError: # pragma: no cover + try: + import chardet # type: ignore + except ImportError: # pragma: no cover + import charset_normalizer as chardet # type: ignore[no-redef] + # While "detect" can return a dict of float, in this context this won't happen + # The cast is for pyright to be happy + encoding = cast(Optional[str], chardet.detect(body)["encoding"]) + if encoding == "utf-8" or encoding is None: + encoding = "utf-8-sig" + + return body.decode(encoding) + + async def load_body(self) -> None: + """Load in memory the body, so it could be accessible from sync methods.""" + try: + self._content = await self.internal_response.read() + except aiohttp.client_exceptions.ClientPayloadError as err: + # This is the case that server closes connection before we finish the reading. aiohttp library + # raises ClientPayloadError. + raise IncompleteReadError(err, error=err) from err + except aiohttp.client_exceptions.ClientResponseError as err: + raise ServiceResponseError(err, error=err) from err + except asyncio.TimeoutError as err: + raise ServiceResponseError(err, error=err) from err + except aiohttp.client_exceptions.ClientError as err: + raise ServiceRequestError(err, error=err) from err + + def stream_download( + self, + pipeline: AsyncPipeline[HttpRequest, AsyncHttpResponse], + *, + decompress: bool = True, + **kwargs, + ) -> AsyncIteratorType[bytes]: + """Generator for streaming response body data. + + :param pipeline: The pipeline object + :type pipeline: azure.core.pipeline.AsyncPipeline + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + :rtype: AsyncIterator[bytes] + :return: An iterator of bytes chunks. + """ + return AioHttpStreamDownloadGenerator(pipeline, self, decompress=decompress, **kwargs) + + def __getstate__(self): + # Be sure body is loaded in memory, otherwise not pickable and let it throw + self.body() + + state = self.__dict__.copy() + # Remove the unpicklable entries. + state["internal_response"] = None # aiohttp response are not pickable (see headers comments) + state["headers"] = CIMultiDict(self.headers) # MultiDictProxy is not pickable + return state diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py new file mode 100644 index 00000000..eb3d8fdf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base.py @@ -0,0 +1,863 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from __future__ import annotations +import abc +from email.message import Message +import json +import logging +import time +import copy +from urllib.parse import urlparse +import xml.etree.ElementTree as ET + +from typing import ( + Generic, + TypeVar, + IO, + Union, + Any, + Mapping, + Optional, + Tuple, + Iterator, + Type, + Dict, + List, + Sequence, + MutableMapping, + ContextManager, + TYPE_CHECKING, +) + +from http.client import HTTPResponse as _HTTPResponse + +from azure.core.exceptions import HttpResponseError +from azure.core.pipeline.policies import SansIOHTTPPolicy +from ...utils._utils import case_insensitive_dict +from ...utils._pipeline_transport_rest_shared import ( + _format_parameters_helper, + _prepare_multipart_body_helper, + _serialize_request, + _format_data_helper, + BytesIOSocket, + _decode_parts_helper, + _get_raw_parts_helper, + _parts_helper, +) + + +HTTPResponseType = TypeVar("HTTPResponseType") +HTTPRequestType = TypeVar("HTTPRequestType") +DataType = Union[bytes, str, Dict[str, Union[str, int]]] + +if TYPE_CHECKING: + # We need a transport to define a pipeline, this "if" avoid a circular import + from azure.core.pipeline import Pipeline + from azure.core.rest._helpers import FileContent + +_LOGGER = logging.getLogger(__name__) + +binary_type = str + + +def _format_url_section(template, **kwargs: Dict[str, str]) -> str: + """String format the template with the kwargs, auto-skip sections of the template that are NOT in the kwargs. + + By default in Python, "format" will raise a KeyError if a template element is not found. Here the section between + the slashes will be removed from the template instead. + + This is used for API like Storage, where when Swagger has template section not defined as parameter. + + :param str template: a string template to fill + :rtype: str + :returns: Template completed + """ + last_template = template + components = template.split("/") + while components: + try: + return template.format(**kwargs) + except KeyError as key: + formatted_components = template.split("/") + components = [c for c in formatted_components if "{{{}}}".format(key.args[0]) not in c] + template = "/".join(components) + if last_template == template: + raise ValueError( + f"The value provided for the url part '{template}' was incorrect, and resulted in an invalid url" + ) from key + last_template = template + return last_template + + +def _urljoin(base_url: str, stub_url: str) -> str: + """Append to end of base URL without losing query parameters. + + :param str base_url: The base URL. + :param str stub_url: Section to append to the end of the URL path. + :returns: The updated URL. + :rtype: str + """ + parsed_base_url = urlparse(base_url) + + # Can't use "urlparse" on a partial url, we get incorrect parsing for things like + # document:build?format=html&api-version=2019-05-01 + split_url = stub_url.split("?", 1) + stub_url_path = split_url.pop(0) + stub_url_query = split_url.pop() if split_url else None + + # Note that _replace is a public API named that way to avoid conflicts in namedtuple + # https://docs.python.org/3/library/collections.html?highlight=namedtuple#collections.namedtuple + parsed_base_url = parsed_base_url._replace( + path=parsed_base_url.path.rstrip("/") + "/" + stub_url_path, + ) + if stub_url_query: + query_params = [stub_url_query] + if parsed_base_url.query: + query_params.insert(0, parsed_base_url.query) + parsed_base_url = parsed_base_url._replace(query="&".join(query_params)) + return parsed_base_url.geturl() + + +class HttpTransport(ContextManager["HttpTransport"], abc.ABC, Generic[HTTPRequestType, HTTPResponseType]): + """An http sender ABC.""" + + @abc.abstractmethod + def send(self, request: HTTPRequestType, **kwargs: Any) -> HTTPResponseType: + """Send the request using this HTTP sender. + + :param request: The pipeline request object + :type request: ~azure.core.transport.HTTPRequest + :return: The pipeline response object. + :rtype: ~azure.core.pipeline.transport.HttpResponse + """ + + @abc.abstractmethod + def open(self) -> None: + """Assign new session if one does not already exist.""" + + @abc.abstractmethod + def close(self) -> None: + """Close the session if it is not externally owned.""" + + def sleep(self, duration: float) -> None: + """Sleep for the specified duration. + + You should always ask the transport to sleep, and not call directly + the stdlib. This is mostly important in async, as the transport + may not use asyncio but other implementations like trio and they have their own + way to sleep, but to keep design + consistent, it's cleaner to always ask the transport to sleep and let the transport + implementor decide how to do it. + + :param float duration: The number of seconds to sleep. + """ + time.sleep(duration) + + +class HttpRequest: + """Represents an HTTP request. + + URL can be given without query parameters, to be added later using "format_parameters". + + :param str method: HTTP method (GET, HEAD, etc.) + :param str url: At least complete scheme/host/path + :param dict[str,str] headers: HTTP headers + :param files: Dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``) for multipart + encoding upload. ``file-tuple`` can be a 2-tuple ``('filename', fileobj)``, 3-tuple + ``('filename', fileobj, 'content_type')`` or a 4-tuple + ``('filename', fileobj, 'content_type', custom_headers)``, where ``'content_type'`` is a string + defining the content type of the given file and ``custom_headers`` + a dict-like object containing additional headers to add for the file. + :type files: dict[str, tuple[str, IO, str, dict]] or dict[str, IO] + :param data: Body to be sent. + :type data: bytes or dict (for form) + """ + + def __init__( + self, + method: str, + url: str, + headers: Optional[Mapping[str, str]] = None, + files: Optional[Any] = None, + data: Optional[DataType] = None, + ) -> None: + self.method = method + self.url = url + self.headers: MutableMapping[str, str] = case_insensitive_dict(headers) + self.files: Optional[Any] = files + self.data: Optional[DataType] = data + self.multipart_mixed_info: Optional[Tuple[Sequence[Any], Sequence[Any], Optional[str], Dict[str, Any]]] = None + + def __repr__(self) -> str: + return "<HttpRequest [{}], url: '{}'>".format(self.method, self.url) + + def __deepcopy__(self, memo: Optional[Dict[int, Any]] = None) -> "HttpRequest": + try: + data = copy.deepcopy(self.body, memo) + files = copy.deepcopy(self.files, memo) + request = HttpRequest(self.method, self.url, self.headers, files, data) + request.multipart_mixed_info = self.multipart_mixed_info + return request + except (ValueError, TypeError): + return copy.copy(self) + + @property + def query(self) -> Dict[str, str]: + """The query parameters of the request as a dict. + + :rtype: dict[str, str] + :return: The query parameters of the request as a dict. + """ + query = urlparse(self.url).query + if query: + return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]} + return {} + + @property + def body(self) -> Optional[DataType]: + """Alias to data. + + :rtype: bytes or str or dict or None + :return: The body of the request. + """ + return self.data + + @body.setter + def body(self, value: Optional[DataType]) -> None: + self.data = value + + @staticmethod + def _format_data(data: Union[str, IO]) -> Union[Tuple[Optional[str], str], Tuple[Optional[str], FileContent, str]]: + """Format field data according to whether it is a stream or + a string for a form-data request. + + :param data: The request field data. + :type data: str or file-like object. + :rtype: tuple[str, IO, str] or tuple[None, str] + :return: A tuple of (data name, data IO, "application/octet-stream") or (None, data str) + """ + return _format_data_helper(data) + + def format_parameters(self, params: Dict[str, str]) -> None: + """Format parameters into a valid query string. + It's assumed all parameters have already been quoted as + valid URL strings. + + :param dict params: A dictionary of parameters. + """ + return _format_parameters_helper(self, params) + + def set_streamed_data_body(self, data: Any) -> None: + """Set a streamable data body. + + :param data: The request field data. + :type data: stream or generator or asyncgenerator + """ + if not isinstance(data, binary_type) and not any( + hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"] + ): + raise TypeError("A streamable data source must be an open file-like object or iterable.") + self.data = data + self.files = None + + def set_text_body(self, data: str) -> None: + """Set a text as body of the request. + + :param data: A text to send as body. + :type data: str + """ + if data is None: + self.data = None + else: + self.data = data + self.headers["Content-Length"] = str(len(self.data)) + self.files = None + + def set_xml_body(self, data: Any) -> None: + """Set an XML element tree as the body of the request. + + :param data: The request field data. + :type data: XML node + """ + if data is None: + self.data = None + else: + bytes_data: bytes = ET.tostring(data, encoding="utf8") + self.data = bytes_data.replace(b"encoding='utf8'", b"encoding='utf-8'") + self.headers["Content-Length"] = str(len(self.data)) + self.files = None + + def set_json_body(self, data: Any) -> None: + """Set a JSON-friendly object as the body of the request. + + :param data: A JSON serializable object + :type data: dict + """ + if data is None: + self.data = None + else: + self.data = json.dumps(data) + self.headers["Content-Length"] = str(len(self.data)) + self.files = None + + def set_formdata_body(self, data: Optional[Dict[str, str]] = None) -> None: + """Set form-encoded data as the body of the request. + + :param data: The request field data. + :type data: dict + """ + if data is None: + data = {} + content_type = self.headers.pop("Content-Type", None) if self.headers else None + + if content_type and content_type.lower() == "application/x-www-form-urlencoded": + self.data = {f: d for f, d in data.items() if d is not None} + self.files = None + else: # Assume "multipart/form-data" + self.files = {f: self._format_data(d) for f, d in data.items() if d is not None} + self.data = None + + def set_bytes_body(self, data: bytes) -> None: + """Set generic bytes as the body of the request. + + Will set content-length. + + :param data: The request field data. + :type data: bytes + """ + if data: + self.headers["Content-Length"] = str(len(data)) + self.data = data + self.files = None + + def set_multipart_mixed( + self, + *requests: "HttpRequest", + policies: Optional[List[SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]]] = None, + boundary: Optional[str] = None, + **kwargs: Any, + ) -> None: + """Set the part of a multipart/mixed. + + Only supported args for now are HttpRequest objects. + + boundary is optional, and one will be generated if you don't provide one. + Note that no verification are made on the boundary, this is considered advanced + enough so you know how to respect RFC1341 7.2.1 and provide a correct boundary. + + Any additional kwargs will be passed into the pipeline context for per-request policy + configuration. + + :param requests: The requests to add to the multipart/mixed + :type requests: ~azure.core.pipeline.transport.HttpRequest + :keyword list[SansIOHTTPPolicy] policies: SansIOPolicy to apply at preparation time + :keyword str boundary: Optional boundary + """ + policies = policies or [] + self.multipart_mixed_info = ( + requests, + policies, + boundary, + kwargs, + ) + + def prepare_multipart_body(self, content_index: int = 0) -> int: + """Will prepare the body of this request according to the multipart information. + + This call assumes the on_request policies have been applied already in their + correct context (sync/async) + + Does nothing if "set_multipart_mixed" was never called. + + :param int content_index: The current index of parts within the batch message. + :returns: The updated index after all parts in this request have been added. + :rtype: int + """ + return _prepare_multipart_body_helper(self, content_index) + + def serialize(self) -> bytes: + """Serialize this request using application/http spec. + + :rtype: bytes + :return: The requests serialized as HTTP low-level message in bytes. + """ + return _serialize_request(self) + + +class _HttpResponseBase: + """Represent a HTTP response. + + No body is defined here on purpose, since async pipeline + will provide async ways to access the body + Full in-memory using "body" as bytes. + + :param request: The request. + :type request: ~azure.core.pipeline.transport.HttpRequest + :param internal_response: The object returned from the HTTP library. + :type internal_response: any + :param int block_size: Defaults to 4096 bytes. + """ + + def __init__( + self, + request: "HttpRequest", + internal_response: Any, + block_size: Optional[int] = None, + ) -> None: + self.request: HttpRequest = request + self.internal_response = internal_response + # This is actually never None, and set by all implementations after the call to + # __init__ of this class. This class is also a legacy impl, so it's risky to change it + # for low benefits The new "rest" implementation does define correctly status_code + # as non-optional. + self.status_code: int = None # type: ignore + self.headers: MutableMapping[str, str] = {} + self.reason: Optional[str] = None + self.content_type: Optional[str] = None + self.block_size: int = block_size or 4096 # Default to same as Requests + + def body(self) -> bytes: + """Return the whole body as bytes in memory. + + Sync implementer should load the body in memory if they can. + Async implementer should rely on async load_body to have been called first. + + :rtype: bytes + :return: The whole body as bytes in memory. + """ + raise NotImplementedError() + + def text(self, encoding: Optional[str] = None) -> str: + """Return the whole body as a string. + + .. seealso:: ~body() + + :param str encoding: The encoding to apply. If None, use "utf-8" with BOM parsing (utf-8-sig). + Implementation can be smarter if they want (using headers or chardet). + :rtype: str + :return: The whole body as a string. + """ + if encoding == "utf-8" or encoding is None: + encoding = "utf-8-sig" + return self.body().decode(encoding) + + def _decode_parts( + self, + message: Message, + http_response_type: Type["_HttpResponseBase"], + requests: Sequence[HttpRequest], + ) -> List["HttpResponse"]: + """Rebuild an HTTP response from pure string. + + :param ~email.message.Message message: The HTTP message as an email object + :param type http_response_type: The type of response to return + :param list[HttpRequest] requests: The requests that were batched together + :rtype: list[HttpResponse] + :return: The list of HttpResponse + """ + return _decode_parts_helper(self, message, http_response_type, requests, _deserialize_response) + + def _get_raw_parts( + self, http_response_type: Optional[Type["_HttpResponseBase"]] = None + ) -> Iterator["HttpResponse"]: + """Assuming this body is multipart, return the iterator or parts. + + If parts are application/http use http_response_type or HttpClientTransportResponse + as envelope. + + :param type http_response_type: The type of response to return + :rtype: iterator[HttpResponse] + :return: The iterator of HttpResponse + """ + return _get_raw_parts_helper(self, http_response_type or HttpClientTransportResponse) + + def raise_for_status(self) -> None: + """Raises an HttpResponseError if the response has an error status code. + If response is good, does nothing. + """ + if not self.status_code or self.status_code >= 400: + raise HttpResponseError(response=self) + + def __repr__(self) -> str: + content_type_str = ", Content-Type: {}".format(self.content_type) if self.content_type else "" + return "<{}: {} {}{}>".format(type(self).__name__, self.status_code, self.reason, content_type_str) + + +class HttpResponse(_HttpResponseBase): + def stream_download(self, pipeline: Pipeline[HttpRequest, "HttpResponse"], **kwargs: Any) -> Iterator[bytes]: + """Generator for streaming request body data. + + Should be implemented by sub-classes if streaming download + is supported. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.Pipeline + :rtype: iterator[bytes] + :return: The generator of bytes connected to the socket + """ + raise NotImplementedError("stream_download is not implemented.") + + def parts(self) -> Iterator["HttpResponse"]: + """Assuming the content-type is multipart/mixed, will return the parts as an iterator. + + :rtype: iterator[HttpResponse] + :return: The iterator of HttpResponse if request was multipart/mixed + :raises ValueError: If the content is not multipart/mixed + """ + return _parts_helper(self) + + +class _HttpClientTransportResponse(_HttpResponseBase): + """Create a HTTPResponse from an http.client response. + + Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary. + + :param HttpRequest request: The request. + :param httpclient_response: The object returned from an HTTP(S)Connection from http.client + :type httpclient_response: http.client.HTTPResponse + """ + + def __init__(self, request, httpclient_response): + super(_HttpClientTransportResponse, self).__init__(request, httpclient_response) + self.status_code = httpclient_response.status + self.headers = case_insensitive_dict(httpclient_response.getheaders()) + self.reason = httpclient_response.reason + self.content_type = self.headers.get("Content-Type") + self.data = None + + def body(self): + if self.data is None: + self.data = self.internal_response.read() + return self.data + + +class HttpClientTransportResponse(_HttpClientTransportResponse, HttpResponse): # pylint: disable=abstract-method + """Create a HTTPResponse from an http.client response. + + Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary. + """ + + +def _deserialize_response(http_response_as_bytes, http_request, http_response_type=HttpClientTransportResponse): + """Deserialize a HTTPResponse from a string. + + :param bytes http_response_as_bytes: The HTTP response as bytes. + :param HttpRequest http_request: The request to store in the response. + :param type http_response_type: The type of response to return + :rtype: HttpResponse + :return: The HTTP response from those low-level bytes. + """ + local_socket = BytesIOSocket(http_response_as_bytes) + response = _HTTPResponse(local_socket, method=http_request.method) + response.begin() + return http_response_type(http_request, response) + + +class PipelineClientBase: + """Base class for pipeline clients. + + :param str base_url: URL for the request. + """ + + def __init__(self, base_url: str): + self._base_url = base_url + + def _request( + self, + method: str, + url: str, + params: Optional[Dict[str, str]], + headers: Optional[Dict[str, str]], + content: Any, + form_content: Optional[Dict[str, Any]], + stream_content: Any, + ) -> HttpRequest: + """Create HttpRequest object. + + If content is not None, guesses will be used to set the right body: + - If content is an XML tree, will serialize as XML + - If content-type starts by "text/", set the content as text + - Else, try JSON serialization + + :param str method: HTTP method (GET, HEAD, etc.) + :param str url: URL for the request. + :param dict params: URL query parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :param stream_content: The body content as a stream + :type stream_content: stream or generator or asyncgenerator + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = HttpRequest(method, self.format_url(url)) + + if params: + request.format_parameters(params) + + if headers: + request.headers.update(headers) + + if content is not None: + content_type = request.headers.get("Content-Type") + if isinstance(content, ET.Element): + request.set_xml_body(content) + # https://github.com/Azure/azure-sdk-for-python/issues/12137 + # A string is valid JSON, make the difference between text + # and a plain JSON string. + # Content-Type is a good indicator of intent from user + elif content_type and content_type.startswith("text/"): + request.set_text_body(content) + else: + try: + request.set_json_body(content) + except TypeError: + request.data = content + + if form_content: + request.set_formdata_body(form_content) + elif stream_content: + request.set_streamed_data_body(stream_content) + + return request + + def format_url(self, url_template: str, **kwargs: Any) -> str: + """Format request URL with the client base URL, unless the + supplied URL is already absolute. + + Note that both the base url and the template url can contain query parameters. + + :param str url_template: The request URL to be formatted if necessary. + :rtype: str + :return: The formatted URL. + """ + url = _format_url_section(url_template, **kwargs) + if url: + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + url = url.lstrip("/") + try: + base = self._base_url.format(**kwargs).rstrip("/") + except KeyError as key: + err_msg = "The value provided for the url part {} was incorrect, and resulted in an invalid url" + raise ValueError(err_msg.format(key.args[0])) from key + + url = _urljoin(base, url) + else: + url = self._base_url.format(**kwargs) + return url + + def get( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + ) -> "HttpRequest": + """Create a GET request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("GET", url, params, headers, content, form_content, None) + request.method = "GET" + return request + + def put( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + stream_content: Any = None, + ) -> HttpRequest: + """Create a PUT request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :param stream_content: The body content as a stream + :type stream_content: stream or generator or asyncgenerator + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("PUT", url, params, headers, content, form_content, stream_content) + return request + + def post( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + stream_content: Any = None, + ) -> HttpRequest: + """Create a POST request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :param stream_content: The body content as a stream + :type stream_content: stream or generator or asyncgenerator + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("POST", url, params, headers, content, form_content, stream_content) + return request + + def head( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + stream_content: Any = None, + ) -> HttpRequest: + """Create a HEAD request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :param stream_content: The body content as a stream + :type stream_content: stream or generator or asyncgenerator + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("HEAD", url, params, headers, content, form_content, stream_content) + return request + + def patch( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + stream_content: Any = None, + ) -> HttpRequest: + """Create a PATCH request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :param stream_content: The body content as a stream + :type stream_content: stream or generator or asyncgenerator + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("PATCH", url, params, headers, content, form_content, stream_content) + return request + + def delete( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + ) -> HttpRequest: + """Create a DELETE request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("DELETE", url, params, headers, content, form_content, None) + return request + + def merge( + self, + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + content: Any = None, + form_content: Optional[Dict[str, Any]] = None, + ) -> HttpRequest: + """Create a MERGE request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :param content: The body content + :type content: bytes or str or dict + :param dict form_content: Form content + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("MERGE", url, params, headers, content, form_content, None) + return request + + def options( + self, # pylint: disable=unused-argument + url: str, + params: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + *, + content: Optional[Union[bytes, str, Dict[Any, Any]]] = None, + form_content: Optional[Dict[Any, Any]] = None, + **kwargs: Any, + ) -> HttpRequest: + """Create a OPTIONS request object. + + :param str url: The request URL. + :param dict params: Request URL parameters. + :param dict headers: Headers + :keyword content: The body content + :type content: bytes or str or dict + :keyword dict form_content: Form content + :return: An HttpRequest object + :rtype: ~azure.core.pipeline.transport.HttpRequest + """ + request = self._request("OPTIONS", url, params, headers, content, form_content, None) + return request diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py new file mode 100644 index 00000000..f04d955e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_async.py @@ -0,0 +1,171 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from __future__ import annotations +import asyncio +import abc +from collections.abc import AsyncIterator +from typing import ( + AsyncIterator as AsyncIteratorType, + TypeVar, + Generic, + Any, + AsyncContextManager, + Optional, + Type, + TYPE_CHECKING, +) +from types import TracebackType + +from ._base import _HttpResponseBase, _HttpClientTransportResponse, HttpRequest +from ...utils._pipeline_transport_rest_shared_async import _PartGenerator + + +AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType") +HTTPResponseType = TypeVar("HTTPResponseType") +HTTPRequestType = TypeVar("HTTPRequestType") + +if TYPE_CHECKING: + # We need a transport to define a pipeline, this "if" avoid a circular import + from .._base_async import AsyncPipeline + + +class _ResponseStopIteration(Exception): + pass + + +def _iterate_response_content(iterator): + """To avoid the following error from Python: + > TypeError: StopIteration interacts badly with generators and cannot be raised into a Future + + :param iterator: An iterator + :type iterator: iterator + :return: The next item in the iterator + :rtype: any + """ + try: + return next(iterator) + except StopIteration: + raise _ResponseStopIteration() # pylint: disable=raise-missing-from + + +class AsyncHttpResponse(_HttpResponseBase, AsyncContextManager["AsyncHttpResponse"]): + """An AsyncHttpResponse ABC. + + Allows for the asynchronous streaming of data from the response. + """ + + def stream_download( + self, + pipeline: AsyncPipeline[HttpRequest, "AsyncHttpResponse"], + *, + decompress: bool = True, + **kwargs: Any, + ) -> AsyncIteratorType[bytes]: + """Generator for streaming response body data. + + Should be implemented by sub-classes if streaming download + is supported. Will return an asynchronous generator. + + :param pipeline: The pipeline object + :type pipeline: azure.core.pipeline.Pipeline + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + :return: An async iterator of bytes + :rtype: AsyncIterator[bytes] + """ + raise NotImplementedError("stream_download is not implemented.") + + def parts(self) -> AsyncIterator["AsyncHttpResponse"]: + """Assuming the content-type is multipart/mixed, will return the parts as an async iterator. + + :return: An async iterator of the parts + :rtype: AsyncIterator + :raises ValueError: If the content is not multipart/mixed + """ + if not self.content_type or not self.content_type.startswith("multipart/mixed"): + raise ValueError("You can't get parts if the response is not multipart/mixed") + + return _PartGenerator(self, default_http_response_type=AsyncHttpClientTransportResponse) + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + return None + + +class AsyncHttpClientTransportResponse(_HttpClientTransportResponse, AsyncHttpResponse): + """Create a HTTPResponse from an http.client response. + + Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary. + + :param HttpRequest request: The request. + :param httpclient_response: The object returned from an HTTP(S)Connection from http.client + """ + + +class AsyncHttpTransport( + AsyncContextManager["AsyncHttpTransport"], + abc.ABC, + Generic[HTTPRequestType, AsyncHTTPResponseType], +): + """An http sender ABC.""" + + @abc.abstractmethod + async def send(self, request: HTTPRequestType, **kwargs: Any) -> AsyncHTTPResponseType: + """Send the request using this HTTP sender. + + :param request: The request object. Exact type can be inferred from the pipeline. + :type request: any + :return: The response object. Exact type can be inferred from the pipeline. + :rtype: any + """ + + @abc.abstractmethod + async def open(self) -> None: + """Assign new session if one does not already exist.""" + + @abc.abstractmethod + async def close(self) -> None: + """Close the session if it is not externally owned.""" + + async def sleep(self, duration: float) -> None: + """Sleep for the specified duration. + + You should always ask the transport to sleep, and not call directly + the stdlib. This is mostly important in async, as the transport + may not use asyncio but other implementation like trio and they their own + way to sleep, but to keep design + consistent, it's cleaner to always ask the transport to sleep and let the transport + implementor decide how to do it. + By default, this method will use "asyncio", and don't need to be overridden + if your transport does too. + + :param float duration: The number of seconds to sleep. + """ + await asyncio.sleep(duration) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py new file mode 100644 index 00000000..15ec81a8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_base_requests_async.py @@ -0,0 +1,55 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import Optional, Type +from types import TracebackType +from ._requests_basic import RequestsTransport +from ._base_async import AsyncHttpTransport + + +class RequestsAsyncTransportBase(RequestsTransport, AsyncHttpTransport): # type: ignore + async def _retrieve_request_data(self, request): + if hasattr(request.data, "__aiter__"): + # Need to consume that async generator, since requests can't do anything with it + # That's not ideal, but a list is our only choice. Memory not optimal here, + # but providing an async generator to a requests based transport is not optimal too + new_data = [] + async for part in request.data: + new_data.append(part) + data_to_send = iter(new_data) + else: + data_to_send = request.data + return data_to_send + + async def __aenter__(self): + return super(RequestsAsyncTransportBase, self).__enter__() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ): + return super(RequestsAsyncTransportBase, self).__exit__(exc_type, exc_value, traceback) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py new file mode 100644 index 00000000..d2096773 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_bigger_block_size_http_adapters.py @@ -0,0 +1,48 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- + +import sys +from requests.adapters import HTTPAdapter + + +class BiggerBlockSizeHTTPAdapter(HTTPAdapter): + def get_connection(self, url, proxies=None): + """Returns a urllib3 connection for the given URL. This should not be + called from user code, and is only exposed for use when subclassing the + :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. + + :param str url: The URL to connect to. + :param MutableMapping proxies: (optional) A Requests-style dictionary of proxies used on this request. + :rtype: urllib3.ConnectionPool + :returns: The urllib3 ConnectionPool for the given URL. + """ + conn = super(BiggerBlockSizeHTTPAdapter, self).get_connection(url, proxies) + system_version = tuple(sys.version_info)[:3] + if system_version[:2] >= (3, 7): + if not conn.conn_kw: + conn.conn_kw = {} + conn.conn_kw["blocksize"] = 32768 + return conn diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py new file mode 100644 index 00000000..f2136515 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_asyncio.py @@ -0,0 +1,296 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import asyncio +from collections.abc import AsyncIterator +import functools +import logging +from typing import ( + Any, + Optional, + AsyncIterator as AsyncIteratorType, + Union, + TYPE_CHECKING, + overload, + Type, + MutableMapping, +) +from types import TracebackType +from urllib3.exceptions import ( + ProtocolError, + NewConnectionError, + ConnectTimeoutError, +) +import requests + +from azure.core.exceptions import ( + ServiceRequestError, + ServiceResponseError, + IncompleteReadError, + HttpResponseError, +) +from azure.core.pipeline import Pipeline +from ._base import HttpRequest +from ._base_async import ( + AsyncHttpResponse, + _ResponseStopIteration, + _iterate_response_content, +) +from ._requests_basic import ( + RequestsTransportResponse, + _read_raw_stream, + AzureErrorUnion, +) +from ._base_requests_async import RequestsAsyncTransportBase +from .._tools import is_rest as _is_rest +from .._tools_async import ( + handle_no_stream_rest_response as _handle_no_stream_rest_response, +) + +if TYPE_CHECKING: + from ...rest import ( + HttpRequest as RestHttpRequest, + AsyncHttpResponse as RestAsyncHttpResponse, + ) + +_LOGGER = logging.getLogger(__name__) + + +def _get_running_loop(): + return asyncio.get_running_loop() + + +class AsyncioRequestsTransport(RequestsAsyncTransportBase): + """Identical implementation as the synchronous RequestsTransport wrapped in a class with + asynchronous methods. Uses the built-in asyncio event loop. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_async.py + :start-after: [START asyncio] + :end-before: [END asyncio] + :language: python + :dedent: 4 + :caption: Asynchronous transport with asyncio. + """ + + async def __aenter__(self): + return super(AsyncioRequestsTransport, self).__enter__() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + return super(AsyncioRequestsTransport, self).__exit__(exc_type, exc_value, traceback) + + async def sleep(self, duration): # pylint:disable=invalid-overridden-method + await asyncio.sleep(duration) + + @overload # type: ignore + async def send( # pylint:disable=invalid-overridden-method + self, request: HttpRequest, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any + ) -> AsyncHttpResponse: + """Send the request using this HTTP sender. + + :param request: The HttpRequest + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + + @overload + async def send( # pylint:disable=invalid-overridden-method + self, request: "RestHttpRequest", *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any + ) -> "RestAsyncHttpResponse": + """Send a `azure.core.rest` request using this HTTP sender. + + :param request: The HttpRequest + :type request: ~azure.core.rest.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.rest.AsyncHttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + + async def send( # pylint:disable=invalid-overridden-method + self, + request: Union[HttpRequest, "RestHttpRequest"], + *, + proxies: Optional[MutableMapping[str, str]] = None, + **kwargs + ) -> Union[AsyncHttpResponse, "RestAsyncHttpResponse"]: + """Send the request using this HTTP sender. + + :param request: The HttpRequest + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + self.open() + loop = kwargs.get("loop", _get_running_loop()) + response = None + error: Optional[AzureErrorUnion] = None + data_to_send = await self._retrieve_request_data(request) + try: + response = await loop.run_in_executor( + None, + functools.partial( + self.session.request, + request.method, + request.url, + headers=request.headers, + data=data_to_send, + files=request.files, + verify=kwargs.pop("connection_verify", self.connection_config.verify), + timeout=kwargs.pop("connection_timeout", self.connection_config.timeout), + cert=kwargs.pop("connection_cert", self.connection_config.cert), + allow_redirects=False, + proxies=proxies, + **kwargs + ), + ) + response.raw.enforce_content_length = True + + except ( + NewConnectionError, + ConnectTimeoutError, + ) as err: + error = ServiceRequestError(err, error=err) + except requests.exceptions.ReadTimeout as err: + error = ServiceResponseError(err, error=err) + except requests.exceptions.ConnectionError as err: + if err.args and isinstance(err.args[0], ProtocolError): + error = ServiceResponseError(err, error=err) + else: + error = ServiceRequestError(err, error=err) + except requests.exceptions.ChunkedEncodingError as err: + msg = err.__str__() + if "IncompleteRead" in msg: + _LOGGER.warning("Incomplete download: %s", err) + error = IncompleteReadError(err, error=err) + else: + _LOGGER.warning("Unable to stream download: %s", err) + error = HttpResponseError(err, error=err) + except requests.RequestException as err: + error = ServiceRequestError(err, error=err) + + if error: + raise error + if _is_rest(request): + from azure.core.rest._requests_asyncio import ( + RestAsyncioRequestsTransportResponse, + ) + + retval = RestAsyncioRequestsTransportResponse( + request=request, + internal_response=response, + block_size=self.connection_config.data_block_size, + ) + if not kwargs.get("stream"): + await _handle_no_stream_rest_response(retval) + return retval + + return AsyncioRequestsTransportResponse(request, response, self.connection_config.data_block_size) + + +class AsyncioStreamDownloadGenerator(AsyncIterator): + """Streams the response body data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.AsyncPipeline + :param response: The response object. + :type response: ~azure.core.pipeline.transport.AsyncHttpResponse + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + """ + + def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse, **kwargs) -> None: + self.pipeline = pipeline + self.request = response.request + self.response = response + self.block_size = response.block_size + decompress = kwargs.pop("decompress", True) + if len(kwargs) > 0: + raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0])) + internal_response = response.internal_response + if decompress: + self.iter_content_func = internal_response.iter_content(self.block_size) + else: + self.iter_content_func = _read_raw_stream(internal_response, self.block_size) + self.content_length = int(response.headers.get("Content-Length", 0)) + + def __len__(self): + return self.content_length + + async def __anext__(self): + loop = _get_running_loop() + internal_response = self.response.internal_response + try: + chunk = await loop.run_in_executor( + None, + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise _ResponseStopIteration() + return chunk + except _ResponseStopIteration: + internal_response.close() + raise StopAsyncIteration() # pylint: disable=raise-missing-from + except requests.exceptions.StreamConsumedError: + raise + except requests.exceptions.ChunkedEncodingError as err: + msg = err.__str__() + if "IncompleteRead" in msg: + _LOGGER.warning("Incomplete download: %s", err) + internal_response.close() + raise IncompleteReadError(err, error=err) from err + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise HttpResponseError(err, error=err) from err + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise + + +class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore + """Asynchronous streaming of data from the response.""" + + def stream_download(self, pipeline, **kwargs) -> AsyncIteratorType[bytes]: # type: ignore + """Generator for streaming request body data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.AsyncPipeline + :rtype: AsyncIterator[bytes] + :return: An async iterator of bytes chunks + """ + return AsyncioStreamDownloadGenerator(pipeline, self, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py new file mode 100644 index 00000000..7cfe556f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_basic.py @@ -0,0 +1,421 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import logging +from typing import ( + Iterator, + Optional, + Union, + TypeVar, + overload, + TYPE_CHECKING, + MutableMapping, +) +from urllib3.util.retry import Retry +from urllib3.exceptions import ( + DecodeError as CoreDecodeError, + ReadTimeoutError, + ProtocolError, + NewConnectionError, + ConnectTimeoutError, +) +import requests + +from azure.core.configuration import ConnectionConfiguration +from azure.core.exceptions import ( + ServiceRequestError, + ServiceResponseError, + IncompleteReadError, + HttpResponseError, + DecodeError, +) +from . import HttpRequest + +from ._base import HttpTransport, HttpResponse, _HttpResponseBase +from ._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter +from .._tools import ( + is_rest as _is_rest, + handle_non_stream_rest_response as _handle_non_stream_rest_response, +) + +if TYPE_CHECKING: + from ...rest import HttpRequest as RestHttpRequest, HttpResponse as RestHttpResponse + +AzureErrorUnion = Union[ + ServiceRequestError, + ServiceResponseError, + IncompleteReadError, + HttpResponseError, +] + +PipelineType = TypeVar("PipelineType") + +_LOGGER = logging.getLogger(__name__) + + +def _read_raw_stream(response, chunk_size=1): + # Special case for urllib3. + if hasattr(response.raw, "stream"): + try: + yield from response.raw.stream(chunk_size, decode_content=False) + except ProtocolError as e: + raise ServiceResponseError(e, error=e) from e + except CoreDecodeError as e: + raise DecodeError(e, error=e) from e + except ReadTimeoutError as e: + raise ServiceRequestError(e, error=e) from e + else: + # Standard file-like object. + while True: + chunk = response.raw.read(chunk_size) + if not chunk: + break + yield chunk + + # following behavior from requests iter_content, we set content consumed to True + # https://github.com/psf/requests/blob/master/requests/models.py#L774 + response._content_consumed = True # pylint: disable=protected-access + + +class _RequestsTransportResponseBase(_HttpResponseBase): + """Base class for accessing response data. + + :param HttpRequest request: The request. + :param requests_response: The object returned from the HTTP library. + :type requests_response: requests.Response + :param int block_size: Size in bytes. + """ + + def __init__(self, request, requests_response, block_size=None): + super(_RequestsTransportResponseBase, self).__init__(request, requests_response, block_size=block_size) + self.status_code = requests_response.status_code + self.headers = requests_response.headers + self.reason = requests_response.reason + self.content_type = requests_response.headers.get("content-type") + + def body(self): + return self.internal_response.content + + def text(self, encoding: Optional[str] = None) -> str: + """Return the whole body as a string. + + If encoding is not provided, mostly rely on requests auto-detection, except + for BOM, that requests ignores. If we see a UTF8 BOM, we assumes UTF8 unlike requests. + + :param str encoding: The encoding to apply. + :rtype: str + :return: The body as text. + """ + if not encoding: + # There is a few situation where "requests" magic doesn't fit us: + # - https://github.com/psf/requests/issues/654 + # - https://github.com/psf/requests/issues/1737 + # - https://github.com/psf/requests/issues/2086 + from codecs import BOM_UTF8 + + if self.internal_response.content[:3] == BOM_UTF8: + encoding = "utf-8-sig" + + if encoding: + if encoding == "utf-8": + encoding = "utf-8-sig" + + self.internal_response.encoding = encoding + + return self.internal_response.text + + +class StreamDownloadGenerator: + """Generator for streaming response data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.Pipeline + :param response: The response object. + :type response: ~azure.core.pipeline.transport.HttpResponse + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + """ + + def __init__(self, pipeline, response, **kwargs): + self.pipeline = pipeline + self.request = response.request + self.response = response + self.block_size = response.block_size + decompress = kwargs.pop("decompress", True) + if len(kwargs) > 0: + raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0])) + internal_response = response.internal_response + if decompress: + self.iter_content_func = internal_response.iter_content(self.block_size) + else: + self.iter_content_func = _read_raw_stream(internal_response, self.block_size) + self.content_length = int(response.headers.get("Content-Length", 0)) + + def __len__(self): + return self.content_length + + def __iter__(self): + return self + + def __next__(self): + internal_response = self.response.internal_response + try: + chunk = next(self.iter_content_func) + if not chunk: + raise StopIteration() + return chunk + except StopIteration: + internal_response.close() + raise StopIteration() # pylint: disable=raise-missing-from + except requests.exceptions.StreamConsumedError: + raise + except requests.exceptions.ContentDecodingError as err: + raise DecodeError(err, error=err) from err + except requests.exceptions.ChunkedEncodingError as err: + msg = err.__str__() + if "IncompleteRead" in msg: + _LOGGER.warning("Incomplete download: %s", err) + internal_response.close() + raise IncompleteReadError(err, error=err) from err + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise HttpResponseError(err, error=err) from err + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise + + next = __next__ # Python 2 compatibility. + + +class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase): + """Streaming of data from the response.""" + + def stream_download(self, pipeline: PipelineType, **kwargs) -> Iterator[bytes]: + """Generator for streaming request body data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.Pipeline + :rtype: iterator[bytes] + :return: The stream of data + """ + return StreamDownloadGenerator(pipeline, self, **kwargs) + + +class RequestsTransport(HttpTransport): + """Implements a basic requests HTTP sender. + + Since requests team recommends to use one session per requests, you should + not consider this class as thread-safe, since it will use one Session + per instance. + + In this simple implementation: + - You provide the configured session if you want to, or a basic session is created. + - All kwargs received by "send" are sent to session.request directly + + :keyword requests.Session session: Request session to use instead of the default one. + :keyword bool session_owner: Decide if the session provided by user is owned by this transport. Default to True. + :keyword bool use_env_settings: Uses proxy settings from environment. Defaults to True. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sync.py + :start-after: [START requests] + :end-before: [END requests] + :language: python + :dedent: 4 + :caption: Synchronous transport with Requests. + """ + + _protocols = ["http://", "https://"] + + def __init__(self, **kwargs) -> None: + self.session = kwargs.get("session", None) + self._session_owner = kwargs.get("session_owner", True) + if not self._session_owner and not self.session: + raise ValueError("session_owner cannot be False if no session is provided") + self.connection_config = ConnectionConfiguration(**kwargs) + self._use_env_settings = kwargs.pop("use_env_settings", True) + # See https://github.com/Azure/azure-sdk-for-python/issues/25640 to understand why we track this + self._has_been_opened = False + + def __enter__(self) -> "RequestsTransport": + self.open() + return self + + def __exit__(self, *args): + self.close() + + def _init_session(self, session: requests.Session) -> None: + """Init session level configuration of requests. + + This is initialization I want to do once only on a session. + + :param requests.Session session: The session object. + """ + session.trust_env = self._use_env_settings + disable_retries = Retry(total=False, redirect=False, raise_on_status=False) + adapter = BiggerBlockSizeHTTPAdapter(max_retries=disable_retries) + for p in self._protocols: + session.mount(p, adapter) + + def open(self): + if self._has_been_opened and not self.session: + raise ValueError( + "HTTP transport has already been closed. " + "You may check if you're calling a function outside of the `with` of your client creation, " + "or if you called `close()` on your client already." + ) + if not self.session: + if self._session_owner: + self.session = requests.Session() + self._init_session(self.session) + else: + raise ValueError("session_owner cannot be False and no session is available") + self._has_been_opened = True + + def close(self): + if self._session_owner and self.session: + self.session.close() + self.session = None + + @overload + def send( + self, request: HttpRequest, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs + ) -> HttpResponse: + """Send a rest request and get back a rest response. + + :param request: The request object to be sent. + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: An HTTPResponse object. + :rtype: ~azure.core.pipeline.transport.HttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + + @overload + def send( + self, request: "RestHttpRequest", *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs + ) -> "RestHttpResponse": + """Send an `azure.core.rest` request and get back a rest response. + + :param request: The request object to be sent. + :type request: ~azure.core.rest.HttpRequest + :return: An HTTPResponse object. + :rtype: ~azure.core.rest.HttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + + def send( # pylint: disable=too-many-statements + self, + request: Union[HttpRequest, "RestHttpRequest"], + *, + proxies: Optional[MutableMapping[str, str]] = None, + **kwargs + ) -> Union[HttpResponse, "RestHttpResponse"]: + """Send request object according to configuration. + + :param request: The request object to be sent. + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: An HTTPResponse object. + :rtype: ~azure.core.pipeline.transport.HttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + self.open() + response = None + error: Optional[AzureErrorUnion] = None + + try: + connection_timeout = kwargs.pop("connection_timeout", self.connection_config.timeout) + + if isinstance(connection_timeout, tuple): + if "read_timeout" in kwargs: + raise ValueError("Cannot set tuple connection_timeout and read_timeout together") + _LOGGER.warning("Tuple timeout setting is deprecated") + timeout = connection_timeout + else: + read_timeout = kwargs.pop("read_timeout", self.connection_config.read_timeout) + timeout = (connection_timeout, read_timeout) + response = self.session.request( # type: ignore + request.method, + request.url, + headers=request.headers, + data=request.data, + files=request.files, + verify=kwargs.pop("connection_verify", self.connection_config.verify), + timeout=timeout, + cert=kwargs.pop("connection_cert", self.connection_config.cert), + allow_redirects=False, + proxies=proxies, + **kwargs + ) + response.raw.enforce_content_length = True + + except AttributeError as err: + if self.session is None: + raise ValueError( + "No session available for request. " + "Please report this issue to https://github.com/Azure/azure-sdk-for-python/issues." + ) from err + raise + except ( + NewConnectionError, + ConnectTimeoutError, + ) as err: + error = ServiceRequestError(err, error=err) + except requests.exceptions.ReadTimeout as err: + error = ServiceResponseError(err, error=err) + except requests.exceptions.ConnectionError as err: + if err.args and isinstance(err.args[0], ProtocolError): + error = ServiceResponseError(err, error=err) + else: + error = ServiceRequestError(err, error=err) + except requests.exceptions.ChunkedEncodingError as err: + msg = err.__str__() + if "IncompleteRead" in msg: + _LOGGER.warning("Incomplete download: %s", err) + error = IncompleteReadError(err, error=err) + else: + _LOGGER.warning("Unable to stream download: %s", err) + error = HttpResponseError(err, error=err) + except requests.RequestException as err: + error = ServiceRequestError(err, error=err) + + if error: + raise error + if _is_rest(request): + from azure.core.rest._requests_basic import RestRequestsTransportResponse + + retval: RestHttpResponse = RestRequestsTransportResponse( + request=request, + internal_response=response, + block_size=self.connection_config.data_block_size, + ) + if not kwargs.get("stream"): + _handle_non_stream_rest_response(retval) + return retval + return RequestsTransportResponse(request, response, self.connection_config.data_block_size) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py new file mode 100644 index 00000000..36d56890 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/transport/_requests_trio.py @@ -0,0 +1,311 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from collections.abc import AsyncIterator +import functools +import logging +from typing import ( + Any, + Optional, + AsyncIterator as AsyncIteratorType, + TYPE_CHECKING, + overload, + Type, + MutableMapping, +) +from types import TracebackType +from urllib3.exceptions import ( + ProtocolError, + NewConnectionError, + ConnectTimeoutError, +) + +import trio + +import requests + +from azure.core.exceptions import ( + ServiceRequestError, + ServiceResponseError, + IncompleteReadError, + HttpResponseError, +) +from azure.core.pipeline import Pipeline +from ._base import HttpRequest +from ._base_async import ( + AsyncHttpResponse, + _ResponseStopIteration, + _iterate_response_content, +) +from ._requests_basic import ( + RequestsTransportResponse, + _read_raw_stream, + AzureErrorUnion, +) +from ._base_requests_async import RequestsAsyncTransportBase +from .._tools import is_rest as _is_rest +from .._tools_async import ( + handle_no_stream_rest_response as _handle_no_stream_rest_response, +) + +if TYPE_CHECKING: + from ...rest import ( + HttpRequest as RestHttpRequest, + AsyncHttpResponse as RestAsyncHttpResponse, + ) + + +_LOGGER = logging.getLogger(__name__) + + +class TrioStreamDownloadGenerator(AsyncIterator): + """Generator for streaming response data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.AsyncPipeline + :param response: The response object. + :type response: ~azure.core.pipeline.transport.AsyncHttpResponse + :keyword bool decompress: If True which is default, will attempt to decode the body based + on the *content-encoding* header. + """ + + def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse, **kwargs) -> None: + self.pipeline = pipeline + self.request = response.request + self.response = response + self.block_size = response.block_size + decompress = kwargs.pop("decompress", True) + if len(kwargs) > 0: + raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0])) + internal_response = response.internal_response + if decompress: + self.iter_content_func = internal_response.iter_content(self.block_size) + else: + self.iter_content_func = _read_raw_stream(internal_response, self.block_size) + self.content_length = int(response.headers.get("Content-Length", 0)) + + def __len__(self): + return self.content_length + + async def __anext__(self): + internal_response = self.response.internal_response + try: + try: + chunk = await trio.to_thread.run_sync( + _iterate_response_content, + self.iter_content_func, + ) + except AttributeError: # trio < 0.12.1 + chunk = await trio.run_sync_in_worker_thread( # type: ignore # pylint:disable=no-member + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise _ResponseStopIteration() + return chunk + except _ResponseStopIteration: + internal_response.close() + raise StopAsyncIteration() # pylint: disable=raise-missing-from + except requests.exceptions.StreamConsumedError: + raise + except requests.exceptions.ChunkedEncodingError as err: + msg = err.__str__() + if "IncompleteRead" in msg: + _LOGGER.warning("Incomplete download: %s", err) + internal_response.close() + raise IncompleteReadError(err, error=err) from err + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise HttpResponseError(err, error=err) from err + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + internal_response.close() + raise + + +class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore + """Asynchronous streaming of data from the response.""" + + def stream_download(self, pipeline, **kwargs) -> AsyncIteratorType[bytes]: # type: ignore + """Generator for streaming response data. + + :param pipeline: The pipeline object + :type pipeline: ~azure.core.pipeline.AsyncPipeline + :rtype: AsyncIterator[bytes] + :return: An async iterator of bytes chunks + """ + return TrioStreamDownloadGenerator(pipeline, self, **kwargs) + + +class TrioRequestsTransport(RequestsAsyncTransportBase): + """Identical implementation as the synchronous RequestsTransport wrapped in a class with + asynchronous methods. Uses the third party trio event loop. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_async.py + :start-after: [START trio] + :end-before: [END trio] + :language: python + :dedent: 4 + :caption: Asynchronous transport with trio. + """ + + async def __aenter__(self): + return super(TrioRequestsTransport, self).__enter__() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + return super(TrioRequestsTransport, self).__exit__(exc_type, exc_value, traceback) + + async def sleep(self, duration): # pylint:disable=invalid-overridden-method + await trio.sleep(duration) + + @overload # type: ignore + async def send( # pylint:disable=invalid-overridden-method + self, request: HttpRequest, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any + ) -> AsyncHttpResponse: + """Send the request using this HTTP sender. + + :param request: The HttpRequest + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + + @overload + async def send( # pylint:disable=invalid-overridden-method + self, request: "RestHttpRequest", *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any + ) -> "RestAsyncHttpResponse": + """Send an `azure.core.rest` request using this HTTP sender. + + :param request: The HttpRequest + :type request: ~azure.core.rest.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.rest.AsyncHttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + + async def send( + self, request, *, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any + ): # pylint:disable=invalid-overridden-method + """Send the request using this HTTP sender. + + :param request: The HttpRequest + :type request: ~azure.core.pipeline.transport.HttpRequest + :return: The AsyncHttpResponse + :rtype: ~azure.core.pipeline.transport.AsyncHttpResponse + + :keyword MutableMapping proxies: will define the proxy to use. Proxy is a dict (protocol, url) + """ + self.open() + trio_limiter = kwargs.get("trio_limiter", None) + response = None + error: Optional[AzureErrorUnion] = None + data_to_send = await self._retrieve_request_data(request) + try: + try: + response = await trio.to_thread.run_sync( + functools.partial( + self.session.request, + request.method, + request.url, + headers=request.headers, + data=data_to_send, + files=request.files, + verify=kwargs.pop("connection_verify", self.connection_config.verify), + timeout=kwargs.pop("connection_timeout", self.connection_config.timeout), + cert=kwargs.pop("connection_cert", self.connection_config.cert), + allow_redirects=False, + proxies=proxies, + **kwargs + ), + limiter=trio_limiter, + ) + except AttributeError: # trio < 0.12.1 + response = await trio.run_sync_in_worker_thread( # type: ignore # pylint:disable=no-member + functools.partial( + self.session.request, + request.method, + request.url, + headers=request.headers, + data=request.data, + files=request.files, + verify=kwargs.pop("connection_verify", self.connection_config.verify), + timeout=kwargs.pop("connection_timeout", self.connection_config.timeout), + cert=kwargs.pop("connection_cert", self.connection_config.cert), + allow_redirects=False, + proxies=proxies, + **kwargs + ), + limiter=trio_limiter, + ) + response.raw.enforce_content_length = True + + except ( + NewConnectionError, + ConnectTimeoutError, + ) as err: + error = ServiceRequestError(err, error=err) + except requests.exceptions.ReadTimeout as err: + error = ServiceResponseError(err, error=err) + except requests.exceptions.ConnectionError as err: + if err.args and isinstance(err.args[0], ProtocolError): + error = ServiceResponseError(err, error=err) + else: + error = ServiceRequestError(err, error=err) + except requests.exceptions.ChunkedEncodingError as err: + msg = err.__str__() + if "IncompleteRead" in msg: + _LOGGER.warning("Incomplete download: %s", err) + error = IncompleteReadError(err, error=err) + else: + _LOGGER.warning("Unable to stream download: %s", err) + error = HttpResponseError(err, error=err) + except requests.RequestException as err: + error = ServiceRequestError(err, error=err) + + if error: + raise error + if _is_rest(request): + from azure.core.rest._requests_trio import RestTrioRequestsTransportResponse + + retval = RestTrioRequestsTransportResponse( + request=request, + internal_response=response, + block_size=self.connection_config.data_block_size, + ) + if not kwargs.get("stream"): + await _handle_no_stream_rest_response(retval) + return retval + + return TrioRequestsTransportResponse(request, response, self.connection_config.data_block_size) |