diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/pipeline/policies')
14 files changed, 3173 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/__init__.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/__init__.py new file mode 100644 index 00000000..c47ee8d6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/__init__.py @@ -0,0 +1,76 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- + +from ._base import HTTPPolicy, SansIOHTTPPolicy, RequestHistory +from ._authentication import ( + BearerTokenCredentialPolicy, + AzureKeyCredentialPolicy, + AzureSasCredentialPolicy, +) +from ._custom_hook import CustomHookPolicy +from ._redirect import RedirectPolicy +from ._retry import RetryPolicy, RetryMode +from ._distributed_tracing import DistributedTracingPolicy +from ._universal import ( + HeadersPolicy, + UserAgentPolicy, + NetworkTraceLoggingPolicy, + ContentDecodePolicy, + ProxyPolicy, + HttpLoggingPolicy, + RequestIdPolicy, +) +from ._base_async import AsyncHTTPPolicy +from ._authentication_async import AsyncBearerTokenCredentialPolicy +from ._redirect_async import AsyncRedirectPolicy +from ._retry_async import AsyncRetryPolicy +from ._sensitive_header_cleanup_policy import SensitiveHeaderCleanupPolicy + +__all__ = [ + "HTTPPolicy", + "SansIOHTTPPolicy", + "BearerTokenCredentialPolicy", + "AzureKeyCredentialPolicy", + "AzureSasCredentialPolicy", + "HeadersPolicy", + "UserAgentPolicy", + "NetworkTraceLoggingPolicy", + "ContentDecodePolicy", + "RetryMode", + "RetryPolicy", + "RedirectPolicy", + "ProxyPolicy", + "CustomHookPolicy", + "DistributedTracingPolicy", + "RequestHistory", + "HttpLoggingPolicy", + "RequestIdPolicy", + "AsyncHTTPPolicy", + "AsyncBearerTokenCredentialPolicy", + "AsyncRedirectPolicy", + "AsyncRetryPolicy", + "SensitiveHeaderCleanupPolicy", +] diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_authentication.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_authentication.py new file mode 100644 index 00000000..53727003 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_authentication.py @@ -0,0 +1,306 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import time +import base64 +from typing import TYPE_CHECKING, Optional, TypeVar, MutableMapping, Any, Union, cast +from azure.core.credentials import ( + TokenCredential, + SupportsTokenInfo, + TokenRequestOptions, + TokenProvider, +) +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.transport import ( + HttpResponse as LegacyHttpResponse, + HttpRequest as LegacyHttpRequest, +) +from azure.core.rest import HttpResponse, HttpRequest +from . import HTTPPolicy, SansIOHTTPPolicy +from ...exceptions import ServiceRequestError +from ._utils import get_challenge_parameter + +if TYPE_CHECKING: + + from azure.core.credentials import ( + AccessToken, + AccessTokenInfo, + AzureKeyCredential, + AzureSasCredential, + ) + +HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) + + +# pylint:disable=too-few-public-methods +class _BearerTokenCredentialPolicyBase: + """Base class for a Bearer Token Credential Policy. + + :param credential: The credential. + :type credential: ~azure.core.credentials.TokenProvider + :param str scopes: Lets you specify the type of access needed. + :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested + tokens. Defaults to False. + """ + + def __init__(self, credential: TokenProvider, *scopes: str, **kwargs: Any) -> None: + super(_BearerTokenCredentialPolicyBase, self).__init__() + self._scopes = scopes + self._credential = credential + self._token: Optional[Union["AccessToken", "AccessTokenInfo"]] = None + self._enable_cae: bool = kwargs.get("enable_cae", False) + + @staticmethod + def _enforce_https(request: PipelineRequest[HTTPRequestType]) -> None: + # move 'enforce_https' from options to context so it persists + # across retries but isn't passed to a transport implementation + option = request.context.options.pop("enforce_https", None) + + # True is the default setting; we needn't preserve an explicit opt in to the default behavior + if option is False: + request.context["enforce_https"] = option + + enforce_https = request.context.get("enforce_https", True) + if enforce_https and not request.http_request.url.lower().startswith("https"): + raise ServiceRequestError( + "Bearer token authentication is not permitted for non-TLS protected (non-https) URLs." + ) + + @staticmethod + def _update_headers(headers: MutableMapping[str, str], token: str) -> None: + """Updates the Authorization header with the bearer token. + + :param MutableMapping[str, str] headers: The HTTP Request headers + :param str token: The OAuth token. + """ + headers["Authorization"] = "Bearer {}".format(token) + + @property + def _need_new_token(self) -> bool: + now = time.time() + refresh_on = getattr(self._token, "refresh_on", None) + return not self._token or (refresh_on and refresh_on <= now) or self._token.expires_on - now < 300 + + def _get_token(self, *scopes: str, **kwargs: Any) -> Union["AccessToken", "AccessTokenInfo"]: + if self._enable_cae: + kwargs.setdefault("enable_cae", self._enable_cae) + + if hasattr(self._credential, "get_token_info"): + options: TokenRequestOptions = {} + # Loop through all the keyword arguments and check if they are part of the TokenRequestOptions. + for key in list(kwargs.keys()): + if key in TokenRequestOptions.__annotations__: # pylint: disable=no-member + options[key] = kwargs.pop(key) # type: ignore[literal-required] + + return cast(SupportsTokenInfo, self._credential).get_token_info(*scopes, options=options) + return cast(TokenCredential, self._credential).get_token(*scopes, **kwargs) + + def _request_token(self, *scopes: str, **kwargs: Any) -> None: + """Request a new token from the credential. + + This will call the credential's appropriate method to get a token and store it in the policy. + + :param str scopes: The type of access needed. + """ + self._token = self._get_token(*scopes, **kwargs) + + +class BearerTokenCredentialPolicy(_BearerTokenCredentialPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]): + """Adds a bearer token Authorization header to requests. + + :param credential: The credential. + :type credential: ~azure.core.TokenCredential + :param str scopes: Lets you specify the type of access needed. + :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested + tokens. Defaults to False. + :raises: :class:`~azure.core.exceptions.ServiceRequestError` + """ + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Called before the policy sends a request. + + The base implementation authorizes the request with a bearer token. + + :param ~azure.core.pipeline.PipelineRequest request: the request + """ + self._enforce_https(request) + + if self._token is None or self._need_new_token: + self._request_token(*self._scopes) + bearer_token = cast(Union["AccessToken", "AccessTokenInfo"], self._token).token + self._update_headers(request.http_request.headers, bearer_token) + + def authorize_request(self, request: PipelineRequest[HTTPRequestType], *scopes: str, **kwargs: Any) -> None: + """Acquire a token from the credential and authorize the request with it. + + Keyword arguments are passed to the credential's get_token method. The token will be cached and used to + authorize future requests. + + :param ~azure.core.pipeline.PipelineRequest request: the request + :param str scopes: required scopes of authentication + """ + self._request_token(*scopes, **kwargs) + bearer_token = cast(Union["AccessToken", "AccessTokenInfo"], self._token).token + self._update_headers(request.http_request.headers, bearer_token) + + def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: + """Authorize request with a bearer token and send it to the next policy + + :param request: The pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + :return: The pipeline response object + :rtype: ~azure.core.pipeline.PipelineResponse + """ + self.on_request(request) + try: + response = self.next.send(request) + except Exception: + self.on_exception(request) + raise + + self.on_response(request, response) + if response.http_response.status_code == 401: + self._token = None # any cached token is invalid + if "WWW-Authenticate" in response.http_response.headers: + request_authorized = self.on_challenge(request, response) + if request_authorized: + # if we receive a challenge response, we retrieve a new token + # which matches the new target. In this case, we don't want to remove + # token from the request so clear the 'insecure_domain_change' tag + request.context.options.pop("insecure_domain_change", False) + try: + response = self.next.send(request) + self.on_response(request, response) + except Exception: + self.on_exception(request) + raise + + return response + + def on_challenge( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> bool: + """Authorize request according to an authentication challenge + + This method is called when the resource provider responds 401 with a WWW-Authenticate header. + + :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge + :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response + :returns: a bool indicating whether the policy should send the request + :rtype: bool + """ + # pylint:disable=unused-argument + headers = response.http_response.headers + error = get_challenge_parameter(headers, "Bearer", "error") + if error == "insufficient_claims": + encoded_claims = get_challenge_parameter(headers, "Bearer", "claims") + if not encoded_claims: + return False + try: + padding_needed = -len(encoded_claims) % 4 + claims = base64.urlsafe_b64decode(encoded_claims + "=" * padding_needed).decode("utf-8") + if claims: + token = self._get_token(*self._scopes, claims=claims) + bearer_token = cast(Union["AccessToken", "AccessTokenInfo"], token).token + request.http_request.headers["Authorization"] = "Bearer " + bearer_token + return True + except Exception: # pylint:disable=broad-except + return False + return False + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> None: + """Executed after the request comes back from the next policy. + + :param request: Request to be modified after returning from the policy. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: Pipeline response object + :type response: ~azure.core.pipeline.PipelineResponse + """ + + def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Executed when an exception is raised while executing the next policy. + + This method is executed inside the exception handler. + + :param request: The Pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + """ + # pylint: disable=unused-argument + return + + +class AzureKeyCredentialPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """Adds a key header for the provided credential. + + :param credential: The credential used to authenticate requests. + :type credential: ~azure.core.credentials.AzureKeyCredential + :param str name: The name of the key header used for the credential. + :keyword str prefix: The name of the prefix for the header value if any. + :raises: ValueError or TypeError + """ + + def __init__( # pylint: disable=unused-argument + self, + credential: "AzureKeyCredential", + name: str, + *, + prefix: Optional[str] = None, + **kwargs: Any, + ) -> None: + super().__init__() + if not hasattr(credential, "key"): + raise TypeError("String is not a supported credential input type. Use an instance of AzureKeyCredential.") + if not name: + raise ValueError("name can not be None or empty") + if not isinstance(name, str): + raise TypeError("name must be a string.") + self._credential = credential + self._name = name + self._prefix = prefix + " " if prefix else "" + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + request.http_request.headers[self._name] = f"{self._prefix}{self._credential.key}" + + +class AzureSasCredentialPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """Adds a shared access signature to query for the provided credential. + + :param credential: The credential used to authenticate requests. + :type credential: ~azure.core.credentials.AzureSasCredential + :raises: ValueError or TypeError + """ + + def __init__( + self, # pylint: disable=unused-argument + credential: "AzureSasCredential", + **kwargs: Any, + ) -> None: + super(AzureSasCredentialPolicy, self).__init__() + if not credential: + raise ValueError("credential can not be None") + self._credential = credential + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + url = request.http_request.url + query = request.http_request.query + signature = self._credential.signature + if signature.startswith("?"): + signature = signature[1:] + if query: + if signature not in url: + url = url + "&" + signature + else: + if url.endswith("?"): + url = url + signature + else: + url = url + "?" + signature + request.http_request.url = url diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_authentication_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_authentication_async.py new file mode 100644 index 00000000..f97b8df3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_authentication_async.py @@ -0,0 +1,219 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import time +import base64 +from typing import Any, Awaitable, Optional, cast, TypeVar, Union + +from azure.core.credentials import AccessToken, AccessTokenInfo, TokenRequestOptions +from azure.core.credentials_async import ( + AsyncTokenCredential, + AsyncSupportsTokenInfo, + AsyncTokenProvider, +) +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.policies import AsyncHTTPPolicy +from azure.core.pipeline.policies._authentication import ( + _BearerTokenCredentialPolicyBase, +) +from azure.core.pipeline.transport import ( + AsyncHttpResponse as LegacyAsyncHttpResponse, + HttpRequest as LegacyHttpRequest, +) +from azure.core.rest import AsyncHttpResponse, HttpRequest +from azure.core.utils._utils import get_running_async_lock +from ._utils import get_challenge_parameter + +from .._tools_async import await_result + +AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", AsyncHttpResponse, LegacyAsyncHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) + + +class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): + """Adds a bearer token Authorization header to requests. + + :param credential: The credential. + :type credential: ~azure.core.credentials_async.AsyncTokenProvider + :param str scopes: Lets you specify the type of access needed. + :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested + tokens. Defaults to False. + """ + + def __init__(self, credential: AsyncTokenProvider, *scopes: str, **kwargs: Any) -> None: + super().__init__() + self._credential = credential + self._scopes = scopes + self._lock_instance = None + self._token: Optional[Union["AccessToken", "AccessTokenInfo"]] = None + self._enable_cae: bool = kwargs.get("enable_cae", False) + + @property + def _lock(self): + if self._lock_instance is None: + self._lock_instance = get_running_async_lock() + return self._lock_instance + + async def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Adds a bearer token Authorization header to request and sends request to next policy. + + :param request: The pipeline request object to be modified. + :type request: ~azure.core.pipeline.PipelineRequest + :raises: :class:`~azure.core.exceptions.ServiceRequestError` + """ + _BearerTokenCredentialPolicyBase._enforce_https(request) # pylint:disable=protected-access + + if self._token is None or self._need_new_token(): + async with self._lock: + # double check because another coroutine may have acquired a token while we waited to acquire the lock + if self._token is None or self._need_new_token(): + await self._request_token(*self._scopes) + bearer_token = cast(Union[AccessToken, AccessTokenInfo], self._token).token + request.http_request.headers["Authorization"] = "Bearer " + bearer_token + + async def authorize_request(self, request: PipelineRequest[HTTPRequestType], *scopes: str, **kwargs: Any) -> None: + """Acquire a token from the credential and authorize the request with it. + + Keyword arguments are passed to the credential's get_token method. The token will be cached and used to + authorize future requests. + + :param ~azure.core.pipeline.PipelineRequest request: the request + :param str scopes: required scopes of authentication + """ + + async with self._lock: + await self._request_token(*scopes, **kwargs) + bearer_token = cast(Union[AccessToken, AccessTokenInfo], self._token).token + request.http_request.headers["Authorization"] = "Bearer " + bearer_token + + async def send( + self, request: PipelineRequest[HTTPRequestType] + ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: + """Authorize request with a bearer token and send it to the next policy + + :param request: The pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + :return: The pipeline response object + :rtype: ~azure.core.pipeline.PipelineResponse + """ + await await_result(self.on_request, request) + response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType] + try: + response = await self.next.send(request) + except Exception: + await await_result(self.on_exception, request) + raise + await await_result(self.on_response, request, response) + + if response.http_response.status_code == 401: + self._token = None # any cached token is invalid + if "WWW-Authenticate" in response.http_response.headers: + request_authorized = await self.on_challenge(request, response) + if request_authorized: + # if we receive a challenge response, we retrieve a new token + # which matches the new target. In this case, we don't want to remove + # token from the request so clear the 'insecure_domain_change' tag + request.context.options.pop("insecure_domain_change", False) + try: + response = await self.next.send(request) + except Exception: + await await_result(self.on_exception, request) + raise + await await_result(self.on_response, request, response) + + return response + + async def on_challenge( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], + ) -> bool: + """Authorize request according to an authentication challenge + + This method is called when the resource provider responds 401 with a WWW-Authenticate header. + + :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge + :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response + :returns: a bool indicating whether the policy should send the request + :rtype: bool + """ + # pylint:disable=unused-argument + headers = response.http_response.headers + error = get_challenge_parameter(headers, "Bearer", "error") + if error == "insufficient_claims": + encoded_claims = get_challenge_parameter(headers, "Bearer", "claims") + if not encoded_claims: + return False + try: + padding_needed = -len(encoded_claims) % 4 + claims = base64.urlsafe_b64decode(encoded_claims + "=" * padding_needed).decode("utf-8") + if claims: + token = await self._get_token(*self._scopes, claims=claims) + bearer_token = cast(Union["AccessToken", "AccessTokenInfo"], token).token + request.http_request.headers["Authorization"] = "Bearer " + bearer_token + return True + except Exception: # pylint:disable=broad-except + return False + return False + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], + ) -> Optional[Awaitable[None]]: + """Executed after the request comes back from the next policy. + + :param request: Request to be modified after returning from the policy. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: Pipeline response object + :type response: ~azure.core.pipeline.PipelineResponse + """ + + def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Executed when an exception is raised while executing the next policy. + + This method is executed inside the exception handler. + + :param request: The Pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + """ + # pylint: disable=unused-argument + return + + def _need_new_token(self) -> bool: + now = time.time() + refresh_on = getattr(self._token, "refresh_on", None) + return not self._token or (refresh_on and refresh_on <= now) or self._token.expires_on - now < 300 + + async def _get_token(self, *scopes: str, **kwargs: Any) -> Union["AccessToken", "AccessTokenInfo"]: + if self._enable_cae: + kwargs.setdefault("enable_cae", self._enable_cae) + + if hasattr(self._credential, "get_token_info"): + options: TokenRequestOptions = {} + # Loop through all the keyword arguments and check if they are part of the TokenRequestOptions. + for key in list(kwargs.keys()): + if key in TokenRequestOptions.__annotations__: # pylint: disable=no-member + options[key] = kwargs.pop(key) # type: ignore[literal-required] + + return await await_result( + cast(AsyncSupportsTokenInfo, self._credential).get_token_info, + *scopes, + options=options, + ) + return await await_result( + cast(AsyncTokenCredential, self._credential).get_token, + *scopes, + **kwargs, + ) + + async def _request_token(self, *scopes: str, **kwargs: Any) -> None: + """Request a new token from the credential. + + This will call the credential's appropriate method to get a token and store it in the policy. + + :param str scopes: The type of access needed. + """ + self._token = await self._get_token(*scopes, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_base.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_base.py new file mode 100644 index 00000000..fce407a0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_base.py @@ -0,0 +1,140 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- + +import abc +import copy +import logging + +from typing import ( + Generic, + TypeVar, + Union, + Any, + Optional, + Awaitable, + Dict, +) + +from azure.core.pipeline import PipelineRequest, PipelineResponse + +HTTPResponseType = TypeVar("HTTPResponseType") +HTTPRequestType = TypeVar("HTTPRequestType") + +_LOGGER = logging.getLogger(__name__) + + +class HTTPPolicy(abc.ABC, Generic[HTTPRequestType, HTTPResponseType]): + """An HTTP policy ABC. + + Use with a synchronous pipeline. + """ + + next: "HTTPPolicy[HTTPRequestType, HTTPResponseType]" + """Pointer to the next policy or a transport (wrapped as a policy). Will be set at pipeline creation.""" + + @abc.abstractmethod + def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: + """Abstract send method for a synchronous pipeline. Mutates the request. + + Context content is dependent on the HttpTransport. + + :param request: The pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + :return: The pipeline response object. + :rtype: ~azure.core.pipeline.PipelineResponse + """ + + +class SansIOHTTPPolicy(Generic[HTTPRequestType, HTTPResponseType]): + """Represents a sans I/O policy. + + SansIOHTTPPolicy is a base class for policies that only modify or + mutate a request based on the HTTP specification, and do not depend + on the specifics of any particular transport. SansIOHTTPPolicy + subclasses will function in either a Pipeline or an AsyncPipeline, + and can act either before the request is done, or after. + You can optionally make these methods coroutines (or return awaitable objects) + but they will then be tied to AsyncPipeline usage. + """ + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> Union[None, Awaitable[None]]: + """Is executed before sending the request from next policy. + + :param request: Request to be modified before sent from next policy. + :type request: ~azure.core.pipeline.PipelineRequest + """ + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> Union[None, Awaitable[None]]: + """Is executed after the request comes back from the policy. + + :param request: Request to be modified after returning from the policy. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: Pipeline response object + :type response: ~azure.core.pipeline.PipelineResponse + """ + + def on_exception( + self, + request: PipelineRequest[HTTPRequestType], # pylint: disable=unused-argument + ) -> None: + """Is executed if an exception is raised while executing the next policy. + + This method is executed inside the exception handler. + + :param request: The Pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + """ + return + + +class RequestHistory(Generic[HTTPRequestType, HTTPResponseType]): + """A container for an attempted request and the applicable response. + + This is used to document requests/responses that resulted in redirected/retried requests. + + :param http_request: The request. + :type http_request: ~azure.core.pipeline.transport.HttpRequest + :param http_response: The HTTP response. + :type http_response: ~azure.core.pipeline.transport.HttpResponse + :param Exception error: An error encountered during the request, or None if the response was received successfully. + :param dict context: The pipeline context. + """ + + def __init__( + self, + http_request: HTTPRequestType, + http_response: Optional[HTTPResponseType] = None, + error: Optional[Exception] = None, + context: Optional[Dict[str, Any]] = None, + ) -> None: + self.http_request: HTTPRequestType = copy.deepcopy(http_request) + self.http_response: Optional[HTTPResponseType] = http_response + self.error: Optional[Exception] = error + self.context: Optional[Dict[str, Any]] = context diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_base_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_base_async.py new file mode 100644 index 00000000..eb0a866a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_base_async.py @@ -0,0 +1,57 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import abc + +from typing import Generic, TypeVar +from .. import PipelineRequest, PipelineResponse + +AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType") +HTTPResponseType = TypeVar("HTTPResponseType") +HTTPRequestType = TypeVar("HTTPRequestType") + + +class AsyncHTTPPolicy(abc.ABC, Generic[HTTPRequestType, AsyncHTTPResponseType]): + """An async HTTP policy ABC. + + Use with an asynchronous pipeline. + """ + + next: "AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]" + """Pointer to the next policy or a transport (wrapped as a policy). Will be set at pipeline creation.""" + + @abc.abstractmethod + async def send( + self, request: PipelineRequest[HTTPRequestType] + ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: + """Abstract send method for a asynchronous pipeline. Mutates the request. + + Context content is dependent on the HttpTransport. + + :param request: The pipeline request object. + :type request: ~azure.core.pipeline.PipelineRequest + :return: The pipeline response object. + :rtype: ~azure.core.pipeline.PipelineResponse + """ diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_custom_hook.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_custom_hook.py new file mode 100644 index 00000000..87973e4e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_custom_hook.py @@ -0,0 +1,84 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import TypeVar, Any +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.transport import ( + HttpResponse as LegacyHttpResponse, + HttpRequest as LegacyHttpRequest, +) +from azure.core.rest import HttpResponse, HttpRequest +from ._base import SansIOHTTPPolicy + +HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) + + +class CustomHookPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A simple policy that enable the given callback + with the response. + + :keyword callback raw_request_hook: Callback function. Will be invoked on request. + :keyword callback raw_response_hook: Callback function. Will be invoked on response. + """ + + def __init__(self, **kwargs: Any): + self._request_callback = kwargs.get("raw_request_hook") + self._response_callback = kwargs.get("raw_response_hook") + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """This is executed before sending the request to the next policy. + + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + """ + request_callback = request.context.options.pop("raw_request_hook", None) + if request_callback: + request.context["raw_request_hook"] = request_callback + request_callback(request) + elif self._request_callback: + self._request_callback(request) + + response_callback = request.context.options.pop("raw_response_hook", None) + if response_callback: + request.context["raw_response_hook"] = response_callback + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> None: + """This is executed after the request comes back from the policy. + + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + """ + response_callback = response.context.get("raw_response_hook") + if response_callback: + response_callback(response) + elif self._response_callback: + self._response_callback(response) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_distributed_tracing.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_distributed_tracing.py new file mode 100644 index 00000000..d049881d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_distributed_tracing.py @@ -0,0 +1,153 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# -------------------------------------------------------------------------- +"""Traces network calls using the implementation library from the settings.""" +import logging +import sys +import urllib.parse +from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union, Any, Type +from types import TracebackType + +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.policies import SansIOHTTPPolicy +from azure.core.pipeline.transport import ( + HttpResponse as LegacyHttpResponse, + HttpRequest as LegacyHttpRequest, +) +from azure.core.rest import HttpResponse, HttpRequest +from azure.core.settings import settings +from azure.core.tracing import SpanKind + +if TYPE_CHECKING: + from azure.core.tracing._abstract_span import ( + AbstractSpan, + ) + +HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) +ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType] +OptExcInfo = Union[ExcInfo, Tuple[None, None, None]] + +_LOGGER = logging.getLogger(__name__) + + +def _default_network_span_namer(http_request: HTTPRequestType) -> str: + """Extract the path to be used as network span name. + + :param http_request: The HTTP request + :type http_request: ~azure.core.pipeline.transport.HttpRequest + :returns: The string to use as network span name + :rtype: str + """ + path = urllib.parse.urlparse(http_request.url).path + if not path: + path = "/" + return path + + +class DistributedTracingPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """The policy to create spans for Azure calls. + + :keyword network_span_namer: A callable to customize the span name + :type network_span_namer: callable[[~azure.core.pipeline.transport.HttpRequest], str] + :keyword tracing_attributes: Attributes to set on all created spans + :type tracing_attributes: dict[str, str] + """ + + TRACING_CONTEXT = "TRACING_CONTEXT" + _REQUEST_ID = "x-ms-client-request-id" + _RESPONSE_ID = "x-ms-request-id" + _HTTP_RESEND_COUNT = "http.request.resend_count" + + def __init__(self, **kwargs: Any): + self._network_span_namer = kwargs.get("network_span_namer", _default_network_span_namer) + self._tracing_attributes = kwargs.get("tracing_attributes", {}) + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + ctxt = request.context.options + try: + span_impl_type = settings.tracing_implementation() + if span_impl_type is None: + return + + namer = ctxt.pop("network_span_namer", self._network_span_namer) + tracing_attributes = ctxt.pop("tracing_attributes", self._tracing_attributes) + span_name = namer(request.http_request) + + span = span_impl_type(name=span_name, kind=SpanKind.CLIENT) + for attr, value in tracing_attributes.items(): + span.add_attribute(attr, value) + span.start() + + headers = span.to_header() + request.http_request.headers.update(headers) + + request.context[self.TRACING_CONTEXT] = span + except Exception as err: # pylint: disable=broad-except + _LOGGER.warning("Unable to start network span: %s", err) + + def end_span( + self, + request: PipelineRequest[HTTPRequestType], + response: Optional[HTTPResponseType] = None, + exc_info: Optional[OptExcInfo] = None, + ) -> None: + """Ends the span that is tracing the network and updates its status. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + :param response: The HttpResponse object + :type response: ~azure.core.rest.HTTPResponse or ~azure.core.pipeline.transport.HttpResponse + :param exc_info: The exception information + :type exc_info: tuple + """ + if self.TRACING_CONTEXT not in request.context: + return + + span: "AbstractSpan" = request.context[self.TRACING_CONTEXT] + http_request: Union[HttpRequest, LegacyHttpRequest] = request.http_request + if span is not None: + span.set_http_attributes(http_request, response=response) + if request.context.get("retry_count"): + span.add_attribute(self._HTTP_RESEND_COUNT, request.context["retry_count"]) + request_id = http_request.headers.get(self._REQUEST_ID) + if request_id is not None: + span.add_attribute(self._REQUEST_ID, request_id) + if response and self._RESPONSE_ID in response.headers: + span.add_attribute(self._RESPONSE_ID, response.headers[self._RESPONSE_ID]) + if exc_info: + span.__exit__(*exc_info) + else: + span.finish() + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> None: + self.end_span(request, response=response.http_response) + + def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None: + self.end_span(request, exc_info=sys.exc_info()) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect.py new file mode 100644 index 00000000..f7daf4e8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect.py @@ -0,0 +1,218 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +""" +This module is the requests implementation of Pipeline ABC +""" +import logging +from urllib.parse import urlparse +from typing import Optional, TypeVar, Dict, Any, Union, Type +from typing_extensions import Literal + +from azure.core.exceptions import TooManyRedirectsError +from azure.core.pipeline import PipelineResponse, PipelineRequest +from azure.core.pipeline.transport import ( + HttpResponse as LegacyHttpResponse, + HttpRequest as LegacyHttpRequest, + AsyncHttpResponse as LegacyAsyncHttpResponse, +) +from azure.core.rest import HttpResponse, HttpRequest, AsyncHttpResponse +from ._base import HTTPPolicy, RequestHistory +from ._utils import get_domain + +HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) +AllHttpResponseType = TypeVar( + "AllHttpResponseType", + HttpResponse, + LegacyHttpResponse, + AsyncHttpResponse, + LegacyAsyncHttpResponse, +) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) +ClsRedirectPolicy = TypeVar("ClsRedirectPolicy", bound="RedirectPolicyBase") + +_LOGGER = logging.getLogger(__name__) + + +def domain_changed(original_domain: Optional[str], url: str) -> bool: + """Checks if the domain has changed. + :param str original_domain: The original domain. + :param str url: The new url. + :rtype: bool + :return: Whether the domain has changed. + """ + domain = get_domain(url) + if not original_domain: + return False + if original_domain == domain: + return False + return True + + +class RedirectPolicyBase: + + REDIRECT_STATUSES = frozenset([300, 301, 302, 303, 307, 308]) + + REDIRECT_HEADERS_BLACKLIST = frozenset(["Authorization"]) + + def __init__(self, **kwargs: Any) -> None: + self.allow: bool = kwargs.get("permit_redirects", True) + self.max_redirects: int = kwargs.get("redirect_max", 30) + + remove_headers = set(kwargs.get("redirect_remove_headers", [])) + self._remove_headers_on_redirect = remove_headers.union(self.REDIRECT_HEADERS_BLACKLIST) + redirect_status = set(kwargs.get("redirect_on_status_codes", [])) + self._redirect_on_status_codes = redirect_status.union(self.REDIRECT_STATUSES) + super(RedirectPolicyBase, self).__init__() + + @classmethod + def no_redirects(cls: Type[ClsRedirectPolicy]) -> ClsRedirectPolicy: + """Disable redirects. + + :return: A redirect policy with redirects disabled. + :rtype: ~azure.core.pipeline.policies.RedirectPolicy or ~azure.core.pipeline.policies.AsyncRedirectPolicy + """ + return cls(permit_redirects=False) + + def configure_redirects(self, options: Dict[str, Any]) -> Dict[str, Any]: + """Configures the redirect settings. + + :param options: Keyword arguments from context. + :type options: dict + :return: A dict containing redirect settings and a history of redirects. + :rtype: dict + """ + return { + "allow": options.pop("permit_redirects", self.allow), + "redirects": options.pop("redirect_max", self.max_redirects), + "history": [], + } + + def get_redirect_location( + self, response: PipelineResponse[Any, AllHttpResponseType] + ) -> Union[str, None, Literal[False]]: + """Checks for redirect status code and gets redirect location. + + :param response: The PipelineResponse object + :type response: ~azure.core.pipeline.PipelineResponse + :return: Truthy redirect location string if we got a redirect status + code and valid location. ``None`` if redirect status and no + location. ``False`` if not a redirect status code. + :rtype: str or bool or None + """ + if response.http_response.status_code in [301, 302]: + if response.http_request.method in [ + "GET", + "HEAD", + ]: + return response.http_response.headers.get("location") + return False + if response.http_response.status_code in self._redirect_on_status_codes: + return response.http_response.headers.get("location") + + return False + + def increment( + self, + settings: Dict[str, Any], + response: PipelineResponse[Any, AllHttpResponseType], + redirect_location: str, + ) -> bool: + """Increment the redirect attempts for this request. + + :param dict settings: The redirect settings + :param response: A pipeline response object. + :type response: ~azure.core.pipeline.PipelineResponse + :param str redirect_location: The redirected endpoint. + :return: Whether further redirect attempts are remaining. + False if exhausted; True if more redirect attempts available. + :rtype: bool + """ + # TODO: Revise some of the logic here. + settings["redirects"] -= 1 + settings["history"].append(RequestHistory(response.http_request, http_response=response.http_response)) + + redirected = urlparse(redirect_location) + if not redirected.netloc: + base_url = urlparse(response.http_request.url) + response.http_request.url = "{}://{}/{}".format( + base_url.scheme, base_url.netloc, redirect_location.lstrip("/") + ) + else: + response.http_request.url = redirect_location + if response.http_response.status_code == 303: + response.http_request.method = "GET" + for non_redirect_header in self._remove_headers_on_redirect: + response.http_request.headers.pop(non_redirect_header, None) + return settings["redirects"] >= 0 + + +class RedirectPolicy(RedirectPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A redirect policy. + + A redirect policy in the pipeline can be configured directly or per operation. + + :keyword bool permit_redirects: Whether the client allows redirects. Defaults to True. + :keyword int redirect_max: The maximum allowed redirects. Defaults to 30. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sync.py + :start-after: [START redirect_policy] + :end-before: [END redirect_policy] + :language: python + :dedent: 4 + :caption: Configuring a redirect policy. + """ + + def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: + """Sends the PipelineRequest object to the next policy. + Uses redirect settings to send request to redirect endpoint if necessary. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + :return: Returns the PipelineResponse or raises error if maximum redirects exceeded. + :rtype: ~azure.core.pipeline.PipelineResponse + :raises: ~azure.core.exceptions.TooManyRedirectsError if maximum redirects exceeded. + """ + retryable: bool = True + redirect_settings = self.configure_redirects(request.context.options) + original_domain = get_domain(request.http_request.url) if redirect_settings["allow"] else None + while retryable: + response = self.next.send(request) + redirect_location = self.get_redirect_location(response) + if redirect_location and redirect_settings["allow"]: + retryable = self.increment(redirect_settings, response, redirect_location) + request.http_request = response.http_request + if domain_changed(original_domain, request.http_request.url): + # "insecure_domain_change" is used to indicate that a redirect + # has occurred to a different domain. This tells the SensitiveHeaderCleanupPolicy + # to clean up sensitive headers. We need to remove it before sending the request + # to the transport layer. + request.context.options["insecure_domain_change"] = True + continue + return response + + raise TooManyRedirectsError(redirect_settings["history"]) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect_async.py new file mode 100644 index 00000000..073e8adf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect_async.py @@ -0,0 +1,90 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import TypeVar +from azure.core.exceptions import TooManyRedirectsError +from azure.core.pipeline import PipelineResponse, PipelineRequest +from azure.core.pipeline.transport import ( + AsyncHttpResponse as LegacyAsyncHttpResponse, + HttpRequest as LegacyHttpRequest, +) +from azure.core.rest import AsyncHttpResponse, HttpRequest +from . import AsyncHTTPPolicy +from ._redirect import RedirectPolicyBase, domain_changed +from ._utils import get_domain + +AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", AsyncHttpResponse, LegacyAsyncHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) + + +class AsyncRedirectPolicy(RedirectPolicyBase, AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): + """An async redirect policy. + + An async redirect policy in the pipeline can be configured directly or per operation. + + :keyword bool permit_redirects: Whether the client allows redirects. Defaults to True. + :keyword int redirect_max: The maximum allowed redirects. Defaults to 30. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_async.py + :start-after: [START async_redirect_policy] + :end-before: [END async_redirect_policy] + :language: python + :dedent: 4 + :caption: Configuring an async redirect policy. + """ + + async def send( + self, request: PipelineRequest[HTTPRequestType] + ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: + """Sends the PipelineRequest object to the next policy. + Uses redirect settings to send the request to redirect endpoint if necessary. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + :return: Returns the PipelineResponse or raises error if maximum redirects exceeded. + :rtype: ~azure.core.pipeline.PipelineResponse + :raises: ~azure.core.exceptions.TooManyRedirectsError if maximum redirects exceeded. + """ + redirects_remaining = True + redirect_settings = self.configure_redirects(request.context.options) + original_domain = get_domain(request.http_request.url) if redirect_settings["allow"] else None + while redirects_remaining: + response = await self.next.send(request) + redirect_location = self.get_redirect_location(response) + if redirect_location and redirect_settings["allow"]: + redirects_remaining = self.increment(redirect_settings, response, redirect_location) + request.http_request = response.http_request + if domain_changed(original_domain, request.http_request.url): + # "insecure_domain_change" is used to indicate that a redirect + # has occurred to a different domain. This tells the SensitiveHeaderCleanupPolicy + # to clean up sensitive headers. We need to remove it before sending the request + # to the transport layer. + request.context.options["insecure_domain_change"] = True + continue + return response + + raise TooManyRedirectsError(redirect_settings["history"]) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_retry.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_retry.py new file mode 100644 index 00000000..4021a373 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_retry.py @@ -0,0 +1,582 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import TypeVar, Any, Dict, Optional, Type, List, Union, cast, IO +from io import SEEK_SET, UnsupportedOperation +import logging +import time +from enum import Enum +from azure.core.configuration import ConnectionConfiguration +from azure.core.pipeline import PipelineResponse, PipelineRequest, PipelineContext +from azure.core.pipeline.transport import ( + HttpResponse as LegacyHttpResponse, + AsyncHttpResponse as LegacyAsyncHttpResponse, + HttpRequest as LegacyHttpRequest, + HttpTransport, +) +from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest +from azure.core.exceptions import ( + AzureError, + ClientAuthenticationError, + ServiceResponseError, + ServiceRequestError, + ServiceRequestTimeoutError, + ServiceResponseTimeoutError, +) + +from ._base import HTTPPolicy, RequestHistory +from . import _utils +from ..._enum_meta import CaseInsensitiveEnumMeta + +HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) +AllHttpResponseType = TypeVar( + "AllHttpResponseType", + HttpResponse, + LegacyHttpResponse, + AsyncHttpResponse, + LegacyAsyncHttpResponse, +) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) +ClsRetryPolicy = TypeVar("ClsRetryPolicy", bound="RetryPolicyBase") + +_LOGGER = logging.getLogger(__name__) + + +class RetryMode(str, Enum, metaclass=CaseInsensitiveEnumMeta): + # pylint: disable=enum-must-be-uppercase + Exponential = "exponential" + Fixed = "fixed" + + +class RetryPolicyBase: + # pylint: disable=too-many-instance-attributes + #: Maximum backoff time. + BACKOFF_MAX = 120 + _SAFE_CODES = set(range(506)) - set([408, 429, 500, 502, 503, 504]) + _RETRY_CODES = set(range(999)) - _SAFE_CODES + + def __init__(self, **kwargs: Any) -> None: + self.total_retries: int = kwargs.pop("retry_total", 10) + self.connect_retries: int = kwargs.pop("retry_connect", 3) + self.read_retries: int = kwargs.pop("retry_read", 3) + self.status_retries: int = kwargs.pop("retry_status", 3) + self.backoff_factor: float = kwargs.pop("retry_backoff_factor", 0.8) + self.backoff_max: int = kwargs.pop("retry_backoff_max", self.BACKOFF_MAX) + self.retry_mode: RetryMode = kwargs.pop("retry_mode", RetryMode.Exponential) + self.timeout: int = kwargs.pop("timeout", 604800) + + retry_codes = self._RETRY_CODES + status_codes = kwargs.pop("retry_on_status_codes", []) + self._retry_on_status_codes = set(status_codes) | retry_codes + self._method_whitelist = frozenset(["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]) + self._respect_retry_after_header = True + super(RetryPolicyBase, self).__init__() + + @classmethod + def no_retries(cls: Type[ClsRetryPolicy]) -> ClsRetryPolicy: + """Disable retries. + + :return: A retry policy with retries disabled. + :rtype: ~azure.core.pipeline.policies.RetryPolicy or ~azure.core.pipeline.policies.AsyncRetryPolicy + """ + return cls(retry_total=0) + + def configure_retries(self, options: Dict[str, Any]) -> Dict[str, Any]: + """Configures the retry settings. + + :param options: keyword arguments from context. + :type options: dict + :return: A dict containing settings and history for retries. + :rtype: dict + """ + return { + "total": options.pop("retry_total", self.total_retries), + "connect": options.pop("retry_connect", self.connect_retries), + "read": options.pop("retry_read", self.read_retries), + "status": options.pop("retry_status", self.status_retries), + "backoff": options.pop("retry_backoff_factor", self.backoff_factor), + "max_backoff": options.pop("retry_backoff_max", self.BACKOFF_MAX), + "methods": options.pop("retry_on_methods", self._method_whitelist), + "timeout": options.pop("timeout", self.timeout), + "history": [], + } + + def get_backoff_time(self, settings: Dict[str, Any]) -> float: + """Returns the current backoff time. + + :param dict settings: The retry settings. + :return: The current backoff value. + :rtype: float + """ + # We want to consider only the last consecutive errors sequence (Ignore redirects). + consecutive_errors_len = len(settings["history"]) + if consecutive_errors_len <= 1: + return 0 + + if self.retry_mode == RetryMode.Fixed: + backoff_value = settings["backoff"] + else: + backoff_value = settings["backoff"] * (2 ** (consecutive_errors_len - 1)) + return min(settings["max_backoff"], backoff_value) + + def parse_retry_after(self, retry_after: str) -> float: + """Helper to parse Retry-After and get value in seconds. + + :param str retry_after: Retry-After header + :rtype: float + :return: Value of Retry-After in seconds. + """ + return _utils.parse_retry_after(retry_after) + + def get_retry_after(self, response: PipelineResponse[Any, AllHttpResponseType]) -> Optional[float]: + """Get the value of Retry-After in seconds. + + :param response: The PipelineResponse object + :type response: ~azure.core.pipeline.PipelineResponse + :return: Value of Retry-After in seconds. + :rtype: float or None + """ + return _utils.get_retry_after(response) + + def _is_connection_error(self, err: Exception) -> bool: + """Errors when we're fairly sure that the server did not receive the + request, so it should be safe to retry. + + :param err: The error raised by the pipeline. + :type err: ~azure.core.exceptions.AzureError + :return: True if connection error, False if not. + :rtype: bool + """ + return isinstance(err, ServiceRequestError) + + def _is_read_error(self, err: Exception) -> bool: + """Errors that occur after the request has been started, so we should + assume that the server began processing it. + + :param err: The error raised by the pipeline. + :type err: ~azure.core.exceptions.AzureError + :return: True if read error, False if not. + :rtype: bool + """ + return isinstance(err, ServiceResponseError) + + def _is_method_retryable( + self, + settings: Dict[str, Any], + request: HTTPRequestType, + response: Optional[AllHttpResponseType] = None, + ): + """Checks if a given HTTP method should be retried upon, depending if + it is included on the method allowlist. + + :param dict settings: The retry settings. + :param request: The HTTP request object. + :type request: ~azure.core.rest.HttpRequest + :param response: The HTTP response object. + :type response: ~azure.core.rest.HttpResponse or ~azure.core.rest.AsyncHttpResponse + :return: True if method should be retried upon. False if not in method allowlist. + :rtype: bool + """ + if response and request.method.upper() in ["POST", "PATCH"] and response.status_code in [500, 503, 504]: + return True + if request.method.upper() not in settings["methods"]: + return False + + return True + + def is_retry( + self, + settings: Dict[str, Any], + response: PipelineResponse[HTTPRequestType, AllHttpResponseType], + ) -> bool: + """Checks if method/status code is retryable. + + Based on allowlists and control variables such as the number of + total retries to allow, whether to respect the Retry-After header, + whether this header is present, and whether the returned status + code is on the list of status codes to be retried upon on the + presence of the aforementioned header. + + The behavior is: + - If status_code < 400: don't retry + - Else if Retry-After present: retry + - Else: retry based on the safe status code list ([408, 429, 500, 502, 503, 504]) + + + :param dict settings: The retry settings. + :param response: The PipelineResponse object + :type response: ~azure.core.pipeline.PipelineResponse + :return: True if method/status code is retryable. False if not retryable. + :rtype: bool + """ + if response.http_response.status_code < 400: + return False + has_retry_after = bool(response.http_response.headers.get("Retry-After")) + if has_retry_after and self._respect_retry_after_header: + return True + if not self._is_method_retryable(settings, response.http_request, response=response.http_response): + return False + return settings["total"] and response.http_response.status_code in self._retry_on_status_codes + + def is_exhausted(self, settings: Dict[str, Any]) -> bool: + """Checks if any retries left. + + :param dict settings: the retry settings + :return: False if have more retries. True if retries exhausted. + :rtype: bool + """ + settings_retry_count = ( + settings["total"], + settings["connect"], + settings["read"], + settings["status"], + ) + retry_counts: List[int] = list(filter(None, settings_retry_count)) + if not retry_counts: + return False + + return min(retry_counts) < 0 + + def increment( + self, + settings: Dict[str, Any], + response: Optional[ + Union[ + PipelineRequest[HTTPRequestType], + PipelineResponse[HTTPRequestType, AllHttpResponseType], + ] + ] = None, + error: Optional[Exception] = None, + ) -> bool: + """Increment the retry counters. + + :param settings: The retry settings. + :type settings: dict + :param response: A pipeline response object. + :type response: ~azure.core.pipeline.PipelineResponse + :param error: An error encountered during the request, or + None if the response was received successfully. + :type error: ~azure.core.exceptions.AzureError + :return: Whether any retry attempt is available + True if more retry attempts available, False otherwise + :rtype: bool + """ + # FIXME This code is not None safe: https://github.com/Azure/azure-sdk-for-python/issues/31528 + response = cast( + Union[ + PipelineRequest[HTTPRequestType], + PipelineResponse[HTTPRequestType, AllHttpResponseType], + ], + response, + ) + + settings["total"] -= 1 + + if isinstance(response, PipelineResponse) and response.http_response.status_code == 202: + return False + + if error and self._is_connection_error(error): + # Connect retry? + settings["connect"] -= 1 + settings["history"].append(RequestHistory(response.http_request, error=error)) + + elif error and self._is_read_error(error): + # Read retry? + settings["read"] -= 1 + if hasattr(response, "http_request"): + settings["history"].append(RequestHistory(response.http_request, error=error)) + + else: + # Incrementing because of a server error like a 500 in + # status_forcelist and the given method is in the allowlist + if response: + settings["status"] -= 1 + if hasattr(response, "http_request") and hasattr(response, "http_response"): + settings["history"].append( + RequestHistory(response.http_request, http_response=response.http_response) + ) + + if self.is_exhausted(settings): + return False + + if response.http_request.body and hasattr(response.http_request.body, "read"): + if "body_position" not in settings: + return False + try: + # attempt to rewind the body to the initial position + # If it has "read", it has "seek", so casting for mypy + cast(IO[bytes], response.http_request.body).seek(settings["body_position"], SEEK_SET) + except (UnsupportedOperation, ValueError, AttributeError): + # if body is not seekable, then retry would not work + return False + file_positions = settings.get("file_positions") + if response.http_request.files and file_positions: + try: + for value in response.http_request.files.values(): + file_name, body = value[0], value[1] + if file_name in file_positions: + position = file_positions[file_name] + body.seek(position, SEEK_SET) + except (UnsupportedOperation, ValueError, AttributeError): + # if body is not seekable, then retry would not work + return False + return True + + def update_context(self, context: PipelineContext, retry_settings: Dict[str, Any]) -> None: + """Updates retry history in pipeline context. + + :param context: The pipeline context. + :type context: ~azure.core.pipeline.PipelineContext + :param retry_settings: The retry settings. + :type retry_settings: dict + """ + if retry_settings["history"]: + context["history"] = retry_settings["history"] + + def _configure_timeout( + self, + request: PipelineRequest[HTTPRequestType], + absolute_timeout: float, + is_response_error: bool, + ) -> None: + if absolute_timeout <= 0: + if is_response_error: + raise ServiceResponseTimeoutError("Response timeout") + raise ServiceRequestTimeoutError("Request timeout") + + # if connection_timeout is already set, ensure it doesn't exceed absolute_timeout + connection_timeout = request.context.options.get("connection_timeout") + if connection_timeout: + request.context.options["connection_timeout"] = min(connection_timeout, absolute_timeout) + + # otherwise, try to ensure the transport's configured connection_timeout doesn't exceed absolute_timeout + # ("connection_config" isn't defined on Async/HttpTransport but all implementations in this library have it) + elif hasattr(request.context.transport, "connection_config"): + # FIXME This is fragile, should be refactored. Casting my way for mypy + # https://github.com/Azure/azure-sdk-for-python/issues/31530 + connection_config = cast( + ConnectionConfiguration, request.context.transport.connection_config # type: ignore + ) + + default_timeout = getattr(connection_config, "timeout", absolute_timeout) + try: + if absolute_timeout < default_timeout: + request.context.options["connection_timeout"] = absolute_timeout + except TypeError: + # transport.connection_config.timeout is something unexpected (not a number) + pass + + def _configure_positions(self, request: PipelineRequest[HTTPRequestType], retry_settings: Dict[str, Any]) -> None: + body_position = None + file_positions: Optional[Dict[str, int]] = None + if request.http_request.body and hasattr(request.http_request.body, "read"): + try: + # If it has "read", it has "tell", so casting for mypy + body_position = cast(IO[bytes], request.http_request.body).tell() + except (AttributeError, UnsupportedOperation): + # if body position cannot be obtained, then retries will not work + pass + else: + if request.http_request.files: + file_positions = {} + try: + for value in request.http_request.files.values(): + name, body = value[0], value[1] + if name and body and hasattr(body, "read"): + # If it has "read", it has "tell", so casting for mypy + position = cast(IO[bytes], body).tell() + file_positions[name] = position + except (AttributeError, UnsupportedOperation): + file_positions = None + + retry_settings["body_position"] = body_position + retry_settings["file_positions"] = file_positions + + +class RetryPolicy(RetryPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A retry policy. + + The retry policy in the pipeline can be configured directly, or tweaked on a per-call basis. + + :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts. + Default value is 10. + + :keyword int retry_connect: How many connection-related errors to retry on. + These are errors raised before the request is sent to the remote server, + which we assume has not triggered the server to process the request. Default value is 3. + + :keyword int retry_read: How many times to retry on read errors. + These errors are raised after the request was sent to the server, so the + request may have side-effects. Default value is 3. + + :keyword int retry_status: How many times to retry on bad status codes. Default value is 3. + + :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try + (most errors are resolved immediately by a second try without a delay). + In fixed mode, retry policy will always sleep for {backoff factor}. + In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))` + seconds. If the backoff_factor is 0.1, then the retry will sleep + for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8. + + :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes). + + :keyword RetryMode retry_mode: Fixed or exponential delay between attemps, default is exponential. + + :keyword int timeout: Timeout setting for the operation in seconds, default is 604800s (7 days). + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sync.py + :start-after: [START retry_policy] + :end-before: [END retry_policy] + :language: python + :dedent: 4 + :caption: Configuring a retry policy. + """ + + def _sleep_for_retry( + self, + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + transport: HttpTransport[HTTPRequestType, HTTPResponseType], + ) -> bool: + """Sleep based on the Retry-After response header value. + + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + :param transport: The HTTP transport type. + :type transport: ~azure.core.pipeline.transport.HttpTransport + :return: Whether a sleep was done or not + :rtype: bool + """ + retry_after = self.get_retry_after(response) + if retry_after: + transport.sleep(retry_after) + return True + return False + + def _sleep_backoff( + self, + settings: Dict[str, Any], + transport: HttpTransport[HTTPRequestType, HTTPResponseType], + ) -> None: + """Sleep using exponential backoff. Immediately returns if backoff is 0. + + :param dict settings: The retry settings. + :param transport: The HTTP transport type. + :type transport: ~azure.core.pipeline.transport.HttpTransport + """ + backoff = self.get_backoff_time(settings) + if backoff <= 0: + return + transport.sleep(backoff) + + def sleep( + self, + settings: Dict[str, Any], + transport: HttpTransport[HTTPRequestType, HTTPResponseType], + response: Optional[PipelineResponse[HTTPRequestType, HTTPResponseType]] = None, + ) -> None: + """Sleep between retry attempts. + + This method will respect a server's ``Retry-After`` response header + and sleep the duration of the time requested. If that is not present, it + will use an exponential backoff. By default, the backoff factor is 0 and + this method will return immediately. + + :param dict settings: The retry settings. + :param transport: The HTTP transport type. + :type transport: ~azure.core.pipeline.transport.HttpTransport + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + """ + if response: + slept = self._sleep_for_retry(response, transport) + if slept: + return + self._sleep_backoff(settings, transport) + + def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: + """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + :return: Returns the PipelineResponse or raises error if maximum retries exceeded. + :rtype: ~azure.core.pipeline.PipelineResponse + :raises: ~azure.core.exceptions.AzureError if maximum retries exceeded. + :raises: ~azure.core.exceptions.ClientAuthenticationError if authentication + """ + retry_active = True + response = None + retry_settings = self.configure_retries(request.context.options) + self._configure_positions(request, retry_settings) + + absolute_timeout = retry_settings["timeout"] + is_response_error = True + + while retry_active: + start_time = time.time() + # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but + # here we know that this is an HttpTransport. + # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of + # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc. + transport: HttpTransport[HTTPRequestType, HTTPResponseType] = cast( + HttpTransport[HTTPRequestType, HTTPResponseType], + request.context.transport, + ) + try: + self._configure_timeout(request, absolute_timeout, is_response_error) + request.context["retry_count"] = len(retry_settings["history"]) + response = self.next.send(request) + if self.is_retry(retry_settings, response): + retry_active = self.increment(retry_settings, response=response) + if retry_active: + self.sleep(retry_settings, transport, response=response) + is_response_error = True + continue + break + except ClientAuthenticationError: + # the authentication policy failed such that the client's request can't + # succeed--we'll never have a response to it, so propagate the exception + raise + except AzureError as err: + if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request): + retry_active = self.increment(retry_settings, response=request, error=err) + if retry_active: + self.sleep(retry_settings, transport) + if isinstance(err, ServiceRequestError): + is_response_error = False + else: + is_response_error = True + continue + raise err + finally: + end_time = time.time() + if absolute_timeout: + absolute_timeout -= end_time - start_time + if not response: + raise AzureError("Maximum retries exceeded.") + + self.update_context(response.context, retry_settings) + return response diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_retry_async.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_retry_async.py new file mode 100644 index 00000000..874a4ee7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_retry_async.py @@ -0,0 +1,218 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +""" +This module is the requests implementation of Pipeline ABC +""" +from typing import TypeVar, Dict, Any, Optional, cast +import logging +import time +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.transport import ( + AsyncHttpResponse as LegacyAsyncHttpResponse, + HttpRequest as LegacyHttpRequest, + AsyncHttpTransport, +) +from azure.core.rest import AsyncHttpResponse, HttpRequest +from azure.core.exceptions import ( + AzureError, + ClientAuthenticationError, + ServiceRequestError, +) +from ._base_async import AsyncHTTPPolicy +from ._retry import RetryPolicyBase + +AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", AsyncHttpResponse, LegacyAsyncHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) + +_LOGGER = logging.getLogger(__name__) + + +class AsyncRetryPolicy(RetryPolicyBase, AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): + """Async flavor of the retry policy. + + The async retry policy in the pipeline can be configured directly, or tweaked on a per-call basis. + + :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts. + Default value is 10. + + :keyword int retry_connect: How many connection-related errors to retry on. + These are errors raised before the request is sent to the remote server, + which we assume has not triggered the server to process the request. Default value is 3. + + :keyword int retry_read: How many times to retry on read errors. + These errors are raised after the request was sent to the server, so the + request may have side-effects. Default value is 3. + + :keyword int retry_status: How many times to retry on bad status codes. Default value is 3. + + :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try + (most errors are resolved immediately by a second try without a delay). + Retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))` + seconds. If the backoff_factor is 0.1, then the retry will sleep + for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8. + + :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes). + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_async.py + :start-after: [START async_retry_policy] + :end-before: [END async_retry_policy] + :language: python + :dedent: 4 + :caption: Configuring an async retry policy. + """ + + async def _sleep_for_retry( + self, + response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], + transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType], + ) -> bool: + """Sleep based on the Retry-After response header value. + + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + :param transport: The HTTP transport type. + :type transport: ~azure.core.pipeline.transport.AsyncHttpTransport + :return: Whether the retry-after value was found. + :rtype: bool + """ + retry_after = self.get_retry_after(response) + if retry_after: + await transport.sleep(retry_after) + return True + return False + + async def _sleep_backoff( + self, + settings: Dict[str, Any], + transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType], + ) -> None: + """Sleep using exponential backoff. Immediately returns if backoff is 0. + + :param dict settings: The retry settings. + :param transport: The HTTP transport type. + :type transport: ~azure.core.pipeline.transport.AsyncHttpTransport + """ + backoff = self.get_backoff_time(settings) + if backoff <= 0: + return + await transport.sleep(backoff) + + async def sleep( + self, + settings: Dict[str, Any], + transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType], + response: Optional[PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]] = None, + ) -> None: + """Sleep between retry attempts. + + This method will respect a server's ``Retry-After`` response header + and sleep the duration of the time requested. If that is not present, it + will use an exponential backoff. By default, the backoff factor is 0 and + this method will return immediately. + + :param dict settings: The retry settings. + :param transport: The HTTP transport type. + :type transport: ~azure.core.pipeline.transport.AsyncHttpTransport + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + """ + if response: + slept = await self._sleep_for_retry(response, transport) + if slept: + return + await self._sleep_backoff(settings, transport) + + async def send( + self, request: PipelineRequest[HTTPRequestType] + ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: + """Uses the configured retry policy to send the request to the next policy in the pipeline. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + :return: Returns the PipelineResponse or raises error if maximum retries exceeded. + :rtype: ~azure.core.pipeline.PipelineResponse + :raise: ~azure.core.exceptions.AzureError if maximum retries exceeded. + :raise: ~azure.core.exceptions.ClientAuthenticationError if authentication fails + """ + retry_active = True + response = None + retry_settings = self.configure_retries(request.context.options) + self._configure_positions(request, retry_settings) + + absolute_timeout = retry_settings["timeout"] + is_response_error = True + + while retry_active: + start_time = time.time() + # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but + # here we know that this is an AsyncHttpTransport. + # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of + # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc. + transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType] = cast( + AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType], + request.context.transport, + ) + try: + self._configure_timeout(request, absolute_timeout, is_response_error) + request.context["retry_count"] = len(retry_settings["history"]) + response = await self.next.send(request) + if self.is_retry(retry_settings, response): + retry_active = self.increment(retry_settings, response=response) + if retry_active: + await self.sleep( + retry_settings, + transport, + response=response, + ) + is_response_error = True + continue + break + except ClientAuthenticationError: + # the authentication policy failed such that the client's request can't + # succeed--we'll never have a response to it, so propagate the exception + raise + except AzureError as err: + if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request): + retry_active = self.increment(retry_settings, response=request, error=err) + if retry_active: + await self.sleep(retry_settings, transport) + if isinstance(err, ServiceRequestError): + is_response_error = False + else: + is_response_error = True + continue + raise err + finally: + end_time = time.time() + if absolute_timeout: + absolute_timeout -= end_time - start_time + if not response: + raise AzureError("Maximum retries exceeded.") + + self.update_context(response.context, retry_settings) + return response diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_sensitive_header_cleanup_policy.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_sensitive_header_cleanup_policy.py new file mode 100644 index 00000000..8496f7ab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_sensitive_header_cleanup_policy.py @@ -0,0 +1,80 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import List, Optional, Any, TypeVar +from azure.core.pipeline import PipelineRequest +from azure.core.pipeline.transport import ( + HttpRequest as LegacyHttpRequest, + HttpResponse as LegacyHttpResponse, +) +from azure.core.rest import HttpRequest, HttpResponse +from ._base import SansIOHTTPPolicy + +HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) +HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) + + +class SensitiveHeaderCleanupPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A simple policy that cleans up sensitive headers + + :keyword list[str] blocked_redirect_headers: The headers to clean up when redirecting to another domain. + :keyword bool disable_redirect_cleanup: Opt out cleaning up sensitive headers when redirecting to another domain. + """ + + DEFAULT_SENSITIVE_HEADERS = set( + [ + "Authorization", + "x-ms-authorization-auxiliary", + ] + ) + + def __init__( + self, # pylint: disable=unused-argument + *, + blocked_redirect_headers: Optional[List[str]] = None, + disable_redirect_cleanup: bool = False, + **kwargs: Any + ) -> None: + self._disable_redirect_cleanup = disable_redirect_cleanup + self._blocked_redirect_headers = ( + SensitiveHeaderCleanupPolicy.DEFAULT_SENSITIVE_HEADERS + if blocked_redirect_headers is None + else blocked_redirect_headers + ) + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """This is executed before sending the request to the next policy. + + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + """ + # "insecure_domain_change" is used to indicate that a redirect + # has occurred to a different domain. This tells the SensitiveHeaderCleanupPolicy + # to clean up sensitive headers. We need to remove it before sending the request + # to the transport layer. + insecure_domain_change = request.context.options.pop("insecure_domain_change", False) + if not self._disable_redirect_cleanup and insecure_domain_change: + for header in self._blocked_redirect_headers: + request.http_request.headers.pop(header, None) diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_universal.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_universal.py new file mode 100644 index 00000000..72548aee --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_universal.py @@ -0,0 +1,746 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +""" +This module is the requests implementation of Pipeline ABC +""" +import json +import inspect +import logging +import os +import platform +import xml.etree.ElementTree as ET +import types +import re +import uuid +from typing import IO, cast, Union, Optional, AnyStr, Dict, Any, Set, MutableMapping +import urllib.parse + +from azure.core import __version__ as azcore_version +from azure.core.exceptions import DecodeError + +from azure.core.pipeline import PipelineRequest, PipelineResponse +from ._base import SansIOHTTPPolicy + +from ..transport import HttpRequest as LegacyHttpRequest +from ..transport._base import _HttpResponseBase as LegacySansIOHttpResponse +from ...rest import HttpRequest +from ...rest._rest_py3 import _HttpResponseBase as SansIOHttpResponse + +_LOGGER = logging.getLogger(__name__) + +HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] +HTTPResponseType = Union[LegacySansIOHttpResponse, SansIOHttpResponse] +PipelineResponseType = PipelineResponse[HTTPRequestType, HTTPResponseType] + + +class HeadersPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A simple policy that sends the given headers with the request. + + This will overwrite any headers already defined in the request. Headers can be + configured up front, where any custom headers will be applied to all outgoing + operations, and additional headers can also be added dynamically per operation. + + :param dict base_headers: Headers to send with the request. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sansio.py + :start-after: [START headers_policy] + :end-before: [END headers_policy] + :language: python + :dedent: 4 + :caption: Configuring a headers policy. + """ + + def __init__(self, base_headers: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + self._headers: Dict[str, str] = base_headers or {} + self._headers.update(kwargs.pop("headers", {})) + + @property + def headers(self) -> Dict[str, str]: + """The current headers collection. + + :rtype: dict[str, str] + :return: The current headers collection. + """ + return self._headers + + def add_header(self, key: str, value: str) -> None: + """Add a header to the configuration to be applied to all requests. + + :param str key: The header. + :param str value: The header's value. + """ + self._headers[key] = value + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Updates with the given headers before sending the request to the next policy. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + """ + request.http_request.headers.update(self.headers) + additional_headers = request.context.options.pop("headers", {}) + if additional_headers: + request.http_request.headers.update(additional_headers) + + +class _Unset: + pass + + +class RequestIdPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A simple policy that sets the given request id in the header. + + This will overwrite request id that is already defined in the request. Request id can be + configured up front, where the request id will be applied to all outgoing + operations, and additional request id can also be set dynamically per operation. + + :keyword str request_id: The request id to be added into header. + :keyword bool auto_request_id: Auto generates a unique request ID per call if true which is by default. + :keyword str request_id_header_name: Header name to use. Default is "x-ms-client-request-id". + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sansio.py + :start-after: [START request_id_policy] + :end-before: [END request_id_policy] + :language: python + :dedent: 4 + :caption: Configuring a request id policy. + """ + + def __init__( + self, # pylint: disable=unused-argument + *, + request_id: Union[str, Any] = _Unset, + auto_request_id: bool = True, + request_id_header_name: str = "x-ms-client-request-id", + **kwargs: Any + ) -> None: + super() + self._request_id = request_id + self._auto_request_id = auto_request_id + self._request_id_header_name = request_id_header_name + + def set_request_id(self, value: str) -> None: + """Add the request id to the configuration to be applied to all requests. + + :param str value: The request id value. + """ + self._request_id = value + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Updates with the given request id before sending the request to the next policy. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + """ + request_id = unset = object() + if "request_id" in request.context.options: + request_id = request.context.options.pop("request_id") + if request_id is None: + return + elif self._request_id is None: + return + elif self._request_id is not _Unset: + if self._request_id_header_name in request.http_request.headers: + return + request_id = self._request_id + elif self._auto_request_id: + if self._request_id_header_name in request.http_request.headers: + return + request_id = str(uuid.uuid1()) + if request_id is not unset: + header = {self._request_id_header_name: cast(str, request_id)} + request.http_request.headers.update(header) + + +class UserAgentPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """User-Agent Policy. Allows custom values to be added to the User-Agent header. + + :param str base_user_agent: Sets the base user agent value. + + :keyword bool user_agent_overwrite: Overwrites User-Agent when True. Defaults to False. + :keyword bool user_agent_use_env: Gets user-agent from environment. Defaults to True. + :keyword str user_agent: If specified, this will be added in front of the user agent string. + :keyword str sdk_moniker: If specified, the user agent string will be + azsdk-python-[sdk_moniker] Python/[python_version] ([platform_version]) + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sansio.py + :start-after: [START user_agent_policy] + :end-before: [END user_agent_policy] + :language: python + :dedent: 4 + :caption: Configuring a user agent policy. + """ + + _USERAGENT = "User-Agent" + _ENV_ADDITIONAL_USER_AGENT = "AZURE_HTTP_USER_AGENT" + + def __init__(self, base_user_agent: Optional[str] = None, **kwargs: Any) -> None: + self.overwrite: bool = kwargs.pop("user_agent_overwrite", False) + self.use_env: bool = kwargs.pop("user_agent_use_env", True) + application_id: Optional[str] = kwargs.pop("user_agent", None) + sdk_moniker: str = kwargs.pop("sdk_moniker", "core/{}".format(azcore_version)) + + if base_user_agent: + self._user_agent = base_user_agent + else: + self._user_agent = "azsdk-python-{} Python/{} ({})".format( + sdk_moniker, platform.python_version(), platform.platform() + ) + + if application_id: + self._user_agent = "{} {}".format(application_id, self._user_agent) + + @property + def user_agent(self) -> str: + """The current user agent value. + + :return: The current user agent value. + :rtype: str + """ + if self.use_env: + add_user_agent_header = os.environ.get(self._ENV_ADDITIONAL_USER_AGENT, None) + if add_user_agent_header is not None: + return "{} {}".format(self._user_agent, add_user_agent_header) + return self._user_agent + + def add_user_agent(self, value: str) -> None: + """Add value to current user agent with a space. + :param str value: value to add to user agent. + """ + self._user_agent = "{} {}".format(self._user_agent, value) + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Modifies the User-Agent header before the request is sent. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + """ + http_request = request.http_request + options_dict = request.context.options + if "user_agent" in options_dict: + user_agent = options_dict.pop("user_agent") + if options_dict.pop("user_agent_overwrite", self.overwrite): + http_request.headers[self._USERAGENT] = user_agent + else: + user_agent = "{} {}".format(user_agent, self.user_agent) + http_request.headers[self._USERAGENT] = user_agent + + elif self.overwrite or self._USERAGENT not in http_request.headers: + http_request.headers[self._USERAGENT] = self.user_agent + + +class NetworkTraceLoggingPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """The logging policy in the pipeline is used to output HTTP network trace to the configured logger. + + This accepts both global configuration, and per-request level with "enable_http_logger" + + :param bool logging_enable: Use to enable per operation. Defaults to False. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sansio.py + :start-after: [START network_trace_logging_policy] + :end-before: [END network_trace_logging_policy] + :language: python + :dedent: 4 + :caption: Configuring a network trace logging policy. + """ + + def __init__(self, logging_enable: bool = False, **kwargs: Any): # pylint: disable=unused-argument + self.enable_http_logger = logging_enable + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Logs HTTP request to the DEBUG logger. + + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + """ + http_request = request.http_request + options = request.context.options + logging_enable = options.pop("logging_enable", self.enable_http_logger) + request.context["logging_enable"] = logging_enable + if logging_enable: + if not _LOGGER.isEnabledFor(logging.DEBUG): + return + + try: + log_string = "Request URL: '{}'".format(http_request.url) + log_string += "\nRequest method: '{}'".format(http_request.method) + log_string += "\nRequest headers:" + for header, value in http_request.headers.items(): + log_string += "\n '{}': '{}'".format(header, value) + log_string += "\nRequest body:" + + # We don't want to log the binary data of a file upload. + if isinstance(http_request.body, types.GeneratorType): + log_string += "\nFile upload" + _LOGGER.debug(log_string) + return + try: + if isinstance(http_request.body, types.AsyncGeneratorType): + log_string += "\nFile upload" + _LOGGER.debug(log_string) + return + except AttributeError: + pass + if http_request.body: + log_string += "\n{}".format(str(http_request.body)) + _LOGGER.debug(log_string) + return + log_string += "\nThis request has no body" + _LOGGER.debug(log_string) + except Exception as err: # pylint: disable=broad-except + _LOGGER.debug("Failed to log request: %r", err) + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> None: + """Logs HTTP response to the DEBUG logger. + + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + """ + http_response = response.http_response + try: + logging_enable = response.context["logging_enable"] + if logging_enable: + if not _LOGGER.isEnabledFor(logging.DEBUG): + return + + log_string = "Response status: '{}'".format(http_response.status_code) + log_string += "\nResponse headers:" + for res_header, value in http_response.headers.items(): + log_string += "\n '{}': '{}'".format(res_header, value) + + # We don't want to log binary data if the response is a file. + log_string += "\nResponse content:" + pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE) + header = http_response.headers.get("content-disposition") + + if header and pattern.match(header): + filename = header.partition("=")[2] + log_string += "\nFile attachments: {}".format(filename) + elif http_response.headers.get("content-type", "").endswith("octet-stream"): + log_string += "\nBody contains binary data." + elif http_response.headers.get("content-type", "").startswith("image"): + log_string += "\nBody contains image data." + else: + if response.context.options.get("stream", False): + log_string += "\nBody is streamable." + else: + log_string += "\n{}".format(http_response.text()) + _LOGGER.debug(log_string) + except Exception as err: # pylint: disable=broad-except + _LOGGER.debug("Failed to log response: %s", repr(err)) + + +class _HiddenClassProperties(type): + # Backward compatible for DEFAULT_HEADERS_WHITELIST + # https://github.com/Azure/azure-sdk-for-python/issues/26331 + + @property + def DEFAULT_HEADERS_WHITELIST(cls) -> Set[str]: + return cls.DEFAULT_HEADERS_ALLOWLIST + + @DEFAULT_HEADERS_WHITELIST.setter + def DEFAULT_HEADERS_WHITELIST(cls, value: Set[str]) -> None: + cls.DEFAULT_HEADERS_ALLOWLIST = value + + +class HttpLoggingPolicy( + SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType], + metaclass=_HiddenClassProperties, +): + """The Pipeline policy that handles logging of HTTP requests and responses. + + :param logger: The logger to use for logging. Default to azure.core.pipeline.policies.http_logging_policy. + :type logger: logging.Logger + """ + + DEFAULT_HEADERS_ALLOWLIST: Set[str] = set( + [ + "x-ms-request-id", + "x-ms-client-request-id", + "x-ms-return-client-request-id", + "x-ms-error-code", + "traceparent", + "Accept", + "Cache-Control", + "Connection", + "Content-Length", + "Content-Type", + "Date", + "ETag", + "Expires", + "If-Match", + "If-Modified-Since", + "If-None-Match", + "If-Unmodified-Since", + "Last-Modified", + "Pragma", + "Request-Id", + "Retry-After", + "Server", + "Transfer-Encoding", + "User-Agent", + "WWW-Authenticate", # OAuth Challenge header. + "x-vss-e2eid", # Needed by Azure DevOps pipelines. + "x-msedge-ref", # Needed by Azure DevOps pipelines. + ] + ) + REDACTED_PLACEHOLDER: str = "REDACTED" + MULTI_RECORD_LOG: str = "AZURE_SDK_LOGGING_MULTIRECORD" + + def __init__(self, logger: Optional[logging.Logger] = None, **kwargs: Any): # pylint: disable=unused-argument + self.logger: logging.Logger = logger or logging.getLogger("azure.core.pipeline.policies.http_logging_policy") + self.allowed_query_params: Set[str] = set() + self.allowed_header_names: Set[str] = set(self.__class__.DEFAULT_HEADERS_ALLOWLIST) + + def _redact_query_param(self, key: str, value: str) -> str: + lower_case_allowed_query_params = [param.lower() for param in self.allowed_query_params] + return value if key.lower() in lower_case_allowed_query_params else HttpLoggingPolicy.REDACTED_PLACEHOLDER + + def _redact_header(self, key: str, value: str) -> str: + lower_case_allowed_header_names = [header.lower() for header in self.allowed_header_names] + return value if key.lower() in lower_case_allowed_header_names else HttpLoggingPolicy.REDACTED_PLACEHOLDER + + def on_request( # pylint: disable=too-many-return-statements + self, request: PipelineRequest[HTTPRequestType] + ) -> None: + """Logs HTTP method, url and headers. + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + """ + http_request = request.http_request + options = request.context.options + # Get logger in my context first (request has been retried) + # then read from kwargs (pop if that's the case) + # then use my instance logger + logger = request.context.setdefault("logger", options.pop("logger", self.logger)) + + if not logger.isEnabledFor(logging.INFO): + return + + try: + parsed_url = list(urllib.parse.urlparse(http_request.url)) + parsed_qp = urllib.parse.parse_qsl(parsed_url[4], keep_blank_values=True) + filtered_qp = [(key, self._redact_query_param(key, value)) for key, value in parsed_qp] + # 4 is query + parsed_url[4] = "&".join(["=".join(part) for part in filtered_qp]) + redacted_url = urllib.parse.urlunparse(parsed_url) + + multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) + if multi_record: + logger.info("Request URL: %r", redacted_url) + logger.info("Request method: %r", http_request.method) + logger.info("Request headers:") + for header, value in http_request.headers.items(): + value = self._redact_header(header, value) + logger.info(" %r: %r", header, value) + if isinstance(http_request.body, types.GeneratorType): + logger.info("File upload") + return + try: + if isinstance(http_request.body, types.AsyncGeneratorType): + logger.info("File upload") + return + except AttributeError: + pass + if http_request.body: + logger.info("A body is sent with the request") + return + logger.info("No body was attached to the request") + return + log_string = "Request URL: '{}'".format(redacted_url) + log_string += "\nRequest method: '{}'".format(http_request.method) + log_string += "\nRequest headers:" + for header, value in http_request.headers.items(): + value = self._redact_header(header, value) + log_string += "\n '{}': '{}'".format(header, value) + if isinstance(http_request.body, types.GeneratorType): + log_string += "\nFile upload" + logger.info(log_string) + return + try: + if isinstance(http_request.body, types.AsyncGeneratorType): + log_string += "\nFile upload" + logger.info(log_string) + return + except AttributeError: + pass + if http_request.body: + log_string += "\nA body is sent with the request" + logger.info(log_string) + return + log_string += "\nNo body was attached to the request" + logger.info(log_string) + + except Exception as err: # pylint: disable=broad-except + logger.warning("Failed to log request: %s", repr(err)) + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> None: + http_response = response.http_response + + # Get logger in my context first (request has been retried) + # then read from kwargs (pop if that's the case) + # then use my instance logger + # If on_request was called, should always read from context + options = request.context.options + logger = request.context.setdefault("logger", options.pop("logger", self.logger)) + + try: + if not logger.isEnabledFor(logging.INFO): + return + + multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) + if multi_record: + logger.info("Response status: %r", http_response.status_code) + logger.info("Response headers:") + for res_header, value in http_response.headers.items(): + value = self._redact_header(res_header, value) + logger.info(" %r: %r", res_header, value) + return + log_string = "Response status: {}".format(http_response.status_code) + log_string += "\nResponse headers:" + for res_header, value in http_response.headers.items(): + value = self._redact_header(res_header, value) + log_string += "\n '{}': '{}'".format(res_header, value) + logger.info(log_string) + except Exception as err: # pylint: disable=broad-except + logger.warning("Failed to log response: %s", repr(err)) + + +class ContentDecodePolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """Policy for decoding unstreamed response content. + + :param response_encoding: The encoding to use if known for this service (will disable auto-detection) + :type response_encoding: str + """ + + # Accept "text" because we're open minded people... + JSON_REGEXP = re.compile(r"^(application|text)/([0-9a-z+.-]+\+)?json$") + + # Name used in context + CONTEXT_NAME = "deserialized_data" + + def __init__( + self, response_encoding: Optional[str] = None, **kwargs: Any # pylint: disable=unused-argument + ) -> None: + self._response_encoding = response_encoding + + @classmethod + def deserialize_from_text( + cls, + data: Optional[Union[AnyStr, IO[AnyStr]]], + mime_type: Optional[str] = None, + response: Optional[HTTPResponseType] = None, + ) -> Any: + """Decode response data according to content-type. + + Accept a stream of data as well, but will be load at once in memory for now. + If no content-type, will return the string version (not bytes, not stream) + + :param data: The data to deserialize. + :type data: str or bytes or file-like object + :param response: The HTTP response. + :type response: ~azure.core.pipeline.transport.HttpResponse + :param str mime_type: The mime type. As mime type, charset is not expected. + :param response: If passed, exception will be annotated with that response + :type response: any + :raises ~azure.core.exceptions.DecodeError: If deserialization fails + :returns: A dict (JSON), XML tree or str, depending of the mime_type + :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str + """ + if not data: + return None + + if hasattr(data, "read"): + # Assume a stream + data = cast(IO, data).read() + + if isinstance(data, bytes): + data_as_str = data.decode(encoding="utf-8-sig") + else: + # Explain to mypy the correct type. + data_as_str = cast(str, data) + + if mime_type is None: + return data_as_str + + if cls.JSON_REGEXP.match(mime_type): + try: + return json.loads(data_as_str) + except ValueError as err: + raise DecodeError( + message="JSON is invalid: {}".format(err), + response=response, + error=err, + ) from err + elif "xml" in (mime_type or []): + try: + return ET.fromstring(data_as_str) # nosec + except ET.ParseError as err: + # It might be because the server has an issue, and returned JSON with + # content-type XML.... + # So let's try a JSON load, and if it's still broken + # let's flow the initial exception + def _json_attemp(data): + try: + return True, json.loads(data) + except ValueError: + return False, None # Don't care about this one + + success, json_result = _json_attemp(data) + if success: + return json_result + # If i'm here, it's not JSON, it's not XML, let's scream + # and raise the last context in this block (the XML exception) + # The function hack is because Py2.7 messes up with exception + # context otherwise. + _LOGGER.critical("Wasn't XML not JSON, failing") + raise DecodeError("XML is invalid", response=response) from err + elif mime_type.startswith("text/"): + return data_as_str + raise DecodeError("Cannot deserialize content-type: {}".format(mime_type)) + + @classmethod + def deserialize_from_http_generics( + cls, + response: HTTPResponseType, + encoding: Optional[str] = None, + ) -> Any: + """Deserialize from HTTP response. + + Headers will tested for "content-type" + + :param response: The HTTP response + :type response: any + :param str encoding: The encoding to use if known for this service (will disable auto-detection) + :raises ~azure.core.exceptions.DecodeError: If deserialization fails + :returns: A dict (JSON), XML tree or str, depending of the mime_type + :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str + """ + # Try to use content-type from headers if available + if response.content_type: + mime_type = response.content_type.split(";")[0].strip().lower() + # Ouch, this server did not declare what it sent... + # Let's guess it's JSON... + # Also, since Autorest was considering that an empty body was a valid JSON, + # need that test as well.... + else: + mime_type = "application/json" + + # Rely on transport implementation to give me "text()" decoded correctly + if hasattr(response, "read"): + # since users can call deserialize_from_http_generics by themselves + # we want to make sure our new responses are read before we try to + # deserialize. Only read sync responses since we're in a sync function + # + # Technically HttpResponse do not contain a "read()", but we don't know what + # people have been able to pass here, so keep this code for safety, + # even if it's likely dead code + if not inspect.iscoroutinefunction(response.read): # type: ignore + response.read() # type: ignore + return cls.deserialize_from_text(response.text(encoding), mime_type, response=response) + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + options = request.context.options + response_encoding = options.pop("response_encoding", self._response_encoding) + if response_encoding: + request.context["response_encoding"] = response_encoding + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> None: + """Extract data from the body of a REST response object. + This will load the entire payload in memory. + Will follow Content-Type to parse. + We assume everything is UTF8 (BOM acceptable). + + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + :raises JSONDecodeError: If JSON is requested and parsing is impossible. + :raises UnicodeDecodeError: If bytes is not UTF8 + :raises xml.etree.ElementTree.ParseError: If bytes is not valid XML + :raises ~azure.core.exceptions.DecodeError: If deserialization fails + """ + # If response was asked as stream, do NOT read anything and quit now + if response.context.options.get("stream", True): + return + + response_encoding = request.context.get("response_encoding") + + response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics( + response.http_response, response_encoding + ) + + +class ProxyPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): + """A proxy policy. + + Dictionary mapping protocol or protocol and host to the URL of the proxy + to be used on each Request. + + :param MutableMapping proxies: Maps protocol or protocol and hostname to the URL + of the proxy. + + .. admonition:: Example: + + .. literalinclude:: ../samples/test_example_sansio.py + :start-after: [START proxy_policy] + :end-before: [END proxy_policy] + :language: python + :dedent: 4 + :caption: Configuring a proxy policy. + """ + + def __init__( + self, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any + ): # pylint: disable=unused-argument + self.proxies = proxies + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + ctxt = request.context.options + if self.proxies and "proxies" not in ctxt: + ctxt["proxies"] = self.proxies diff --git a/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_utils.py b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_utils.py new file mode 100644 index 00000000..dce2c45b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/pipeline/policies/_utils.py @@ -0,0 +1,204 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import datetime +import email.utils +from typing import Optional, cast, Union, Tuple +from urllib.parse import urlparse + +from azure.core.pipeline.transport import ( + HttpResponse as LegacyHttpResponse, + AsyncHttpResponse as LegacyAsyncHttpResponse, + HttpRequest as LegacyHttpRequest, +) +from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest + + +from ...utils._utils import _FixedOffset, case_insensitive_dict +from .. import PipelineResponse + +AllHttpResponseType = Union[HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse] +HTTPRequestType = Union[HttpRequest, LegacyHttpRequest] + + +def _parse_http_date(text: str) -> datetime.datetime: + """Parse a HTTP date format into datetime. + + :param str text: Text containing a date in HTTP format + :rtype: datetime.datetime + :return: The parsed datetime + """ + parsed_date = email.utils.parsedate_tz(text) + if not parsed_date: + raise ValueError("Invalid HTTP date") + tz_offset = cast(int, parsed_date[9]) # Look at the code, tz_offset is always an int, at worst 0 + return datetime.datetime(*parsed_date[:6], tzinfo=_FixedOffset(tz_offset / 60)) + + +def parse_retry_after(retry_after: str) -> float: + """Helper to parse Retry-After and get value in seconds. + + :param str retry_after: Retry-After header + :rtype: float + :return: Value of Retry-After in seconds. + """ + delay: float # Using the Mypy recommendation to use float for "int or float" + try: + delay = float(retry_after) + except ValueError: + # Not an integer? Try HTTP date + retry_date = _parse_http_date(retry_after) + delay = (retry_date - datetime.datetime.now(retry_date.tzinfo)).total_seconds() + return max(0, delay) + + +def get_retry_after(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[float]: + """Get the value of Retry-After in seconds. + + :param response: The PipelineResponse object + :type response: ~azure.core.pipeline.PipelineResponse + :return: Value of Retry-After in seconds. + :rtype: float or None + """ + headers = case_insensitive_dict(response.http_response.headers) + retry_after = headers.get("retry-after") + if retry_after: + return parse_retry_after(retry_after) + for ms_header in ["retry-after-ms", "x-ms-retry-after-ms"]: + retry_after = headers.get(ms_header) + if retry_after: + parsed_retry_after = parse_retry_after(retry_after) + return parsed_retry_after / 1000.0 + return None + + +def get_domain(url: str) -> str: + """Get the domain of an url. + + :param str url: The url. + :rtype: str + :return: The domain of the url. + """ + return str(urlparse(url).netloc).lower() + + +def get_challenge_parameter(headers, challenge_scheme: str, challenge_parameter: str) -> Optional[str]: + """ + Parses the specified parameter from a challenge header found in the response. + + :param dict[str, str] headers: The response headers to parse. + :param str challenge_scheme: The challenge scheme containing the challenge parameter, e.g., "Bearer". + :param str challenge_parameter: The parameter key name to search for. + :return: The value of the parameter name if found. + :rtype: str or None + """ + header_value = headers.get("WWW-Authenticate") + if not header_value: + return None + + scheme = challenge_scheme + parameter = challenge_parameter + header_span = header_value + + # Iterate through each challenge value. + while True: + challenge = get_next_challenge(header_span) + if not challenge: + break + challenge_key, header_span = challenge + if challenge_key.lower() != scheme.lower(): + continue + # Enumerate each key-value parameter until we find the parameter key on the specified scheme challenge. + while True: + parameters = get_next_parameter(header_span) + if not parameters: + break + key, value, header_span = parameters + if key.lower() == parameter.lower(): + return value + + return None + + +def get_next_challenge(header_value: str) -> Optional[Tuple[str, str]]: + """ + Iterates through the challenge schemes present in a challenge header. + + :param str header_value: The header value which will be sliced to remove the first parsed challenge key. + :return: The parsed challenge scheme and the remaining header value. + :rtype: tuple[str, str] or None + """ + header_value = header_value.lstrip(" ") + end_of_challenge_key = header_value.find(" ") + + if end_of_challenge_key < 0: + return None + + challenge_key = header_value[:end_of_challenge_key] + header_value = header_value[end_of_challenge_key + 1 :] + + return challenge_key, header_value + + +def get_next_parameter(header_value: str, separator: str = "=") -> Optional[Tuple[str, str, str]]: + """ + Iterates through a challenge header value to extract key-value parameters. + + :param str header_value: The header value after being parsed by get_next_challenge. + :param str separator: The challenge parameter key-value pair separator, default is '='. + :return: The next available challenge parameter as a tuple (param_key, param_value, remaining header_value). + :rtype: tuple[str, str, str] or None + """ + space_or_comma = " ," + header_value = header_value.lstrip(space_or_comma) + + next_space = header_value.find(" ") + next_separator = header_value.find(separator) + + if next_space < next_separator and next_space != -1: + return None + + if next_separator < 0: + return None + + param_key = header_value[:next_separator].strip() + header_value = header_value[next_separator + 1 :] + + quote_index = header_value.find('"') + + if quote_index >= 0: + header_value = header_value[quote_index + 1 :] + param_value = header_value[: header_value.find('"')] + else: + trailing_delimiter_index = header_value.find(" ") + if trailing_delimiter_index >= 0: + param_value = header_value[:trailing_delimiter_index] + else: + param_value = header_value + + if header_value != param_value: + header_value = header_value[len(param_value) + 1 :] + + return param_key, param_value, header_value |