diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/mgmt/core')
15 files changed, 1467 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/__init__.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/__init__.py new file mode 100644 index 00000000..b4f429bb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/__init__.py @@ -0,0 +1,33 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- + + +from ._version import VERSION +from ._pipeline_client import ARMPipelineClient +from ._async_pipeline_client import AsyncARMPipelineClient + +__all__ = ["ARMPipelineClient", "AsyncARMPipelineClient"] +__version__ = VERSION diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/_async_pipeline_client.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/_async_pipeline_client.py new file mode 100644 index 00000000..da65d635 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/_async_pipeline_client.py @@ -0,0 +1,70 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import TypeVar, AsyncContextManager, Any +from collections.abc import MutableSequence +from azure.core import AsyncPipelineClient +from .policies import ( + AsyncARMAutoResourceProviderRegistrationPolicy, + ARMHttpLoggingPolicy, +) + +HTTPRequestType = TypeVar("HTTPRequestType") +AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", bound=AsyncContextManager) + + +class AsyncARMPipelineClient(AsyncPipelineClient[HTTPRequestType, AsyncHTTPResponseType]): + """A pipeline client designed for ARM explicitly. + + :param str base_url: URL for the request. + :keyword AsyncPipeline pipeline: If omitted, a Pipeline object is created and returned. + :keyword list[AsyncHTTPPolicy] policies: If omitted, the standard policies of the configuration object is used. + :keyword per_call_policies: If specified, the policies will be added into the policy list before RetryPolicy + :paramtype per_call_policies: Union[AsyncHTTPPolicy, SansIOHTTPPolicy, + list[AsyncHTTPPolicy], list[SansIOHTTPPolicy]] + :keyword per_retry_policies: If specified, the policies will be added into the policy list after RetryPolicy + :paramtype per_retry_policies: Union[AsyncHTTPPolicy, SansIOHTTPPolicy, + list[AsyncHTTPPolicy], list[SansIOHTTPPolicy]] + :keyword AsyncHttpTransport transport: If omitted, AioHttpTransport is used for asynchronous transport. + """ + + def __init__(self, base_url: str, **kwargs: Any): + if "policies" not in kwargs: + config = kwargs.get("config") + if not config: + raise ValueError("Current implementation requires to pass 'config' if you don't pass 'policies'") + per_call_policies = kwargs.get("per_call_policies", []) + if isinstance(per_call_policies, MutableSequence): + per_call_policies.append(AsyncARMAutoResourceProviderRegistrationPolicy()) + else: + per_call_policies = [ + per_call_policies, + AsyncARMAutoResourceProviderRegistrationPolicy(), + ] + kwargs["per_call_policies"] = per_call_policies + if not config.http_logging_policy: + config.http_logging_policy = kwargs.get("http_logging_policy", ARMHttpLoggingPolicy(**kwargs)) + kwargs["config"] = config + super(AsyncARMPipelineClient, self).__init__(base_url, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/_pipeline_client.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/_pipeline_client.py new file mode 100644 index 00000000..cc5a95fc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/_pipeline_client.py @@ -0,0 +1,65 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import TypeVar, Any +from collections.abc import MutableSequence +from azure.core import PipelineClient +from .policies import ARMAutoResourceProviderRegistrationPolicy, ARMHttpLoggingPolicy + +HTTPResponseType = TypeVar("HTTPResponseType") +HTTPRequestType = TypeVar("HTTPRequestType") + + +class ARMPipelineClient(PipelineClient[HTTPRequestType, HTTPResponseType]): + """A pipeline client designed for ARM explicitly. + + :param str base_url: URL for the request. + :keyword Pipeline pipeline: If omitted, a Pipeline object is created and returned. + :keyword list[HTTPPolicy] policies: If omitted, the standard policies of the configuration object is used. + :keyword per_call_policies: If specified, the policies will be added into the policy list before RetryPolicy + :paramtype per_call_policies: Union[HTTPPolicy, SansIOHTTPPolicy, list[HTTPPolicy], list[SansIOHTTPPolicy]] + :keyword per_retry_policies: If specified, the policies will be added into the policy list after RetryPolicy + :paramtype per_retry_policies: Union[HTTPPolicy, SansIOHTTPPolicy, list[HTTPPolicy], list[SansIOHTTPPolicy]] + :keyword HttpTransport transport: If omitted, RequestsTransport is used for synchronous transport. + """ + + def __init__(self, base_url: str, **kwargs: Any) -> None: + if "policies" not in kwargs: + config = kwargs.get("config") + if not config: + raise ValueError("Current implementation requires to pass 'config' if you don't pass 'policies'") + per_call_policies = kwargs.get("per_call_policies", []) + if isinstance(per_call_policies, MutableSequence): + per_call_policies.append(ARMAutoResourceProviderRegistrationPolicy()) + else: + per_call_policies = [ + per_call_policies, + ARMAutoResourceProviderRegistrationPolicy(), + ] + kwargs["per_call_policies"] = per_call_policies + if not config.http_logging_policy: + config.http_logging_policy = kwargs.get("http_logging_policy", ARMHttpLoggingPolicy(**kwargs)) + kwargs["config"] = config + super(ARMPipelineClient, self).__init__(base_url, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/_version.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/_version.py new file mode 100644 index 00000000..1948e34a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/_version.py @@ -0,0 +1,12 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# +# Code generated by Microsoft (R) AutoRest Code Generator. +# Changes may cause incorrect behavior and will be lost if the code is +# regenerated. +# -------------------------------------------------------------------------- + +VERSION = "1.5.0" diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/exceptions.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/exceptions.py new file mode 100644 index 00000000..e7048db6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/exceptions.py @@ -0,0 +1,85 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import Mapping, Any, Sequence +import json +import logging + + +from azure.core.exceptions import ODataV4Format + + +_LOGGER = logging.getLogger(__name__) + + +class TypedErrorInfo: + """Additional info class defined in ARM specification. + + https://github.com/Azure/azure-resource-manager-rpc/blob/master/v1.0/common-api-details.md#error-response-content + """ + + def __init__(self, type: str, info: Mapping[str, Any]) -> None: # pylint: disable=redefined-builtin + self.type = type + self.info = info + + def __str__(self) -> str: + """Cloud error message. + + :return: The cloud error message. + :rtype: str + """ + error_str = "Type: {}".format(self.type) + error_str += "\nInfo: {}".format(json.dumps(self.info, indent=4)) + return error_str + + +class ARMErrorFormat(ODataV4Format): + """Describe error format from ARM, used at the base or inside "details" node. + + This format is compatible with ODataV4 format. + https://github.com/Azure/azure-resource-manager-rpc/blob/master/v1.0/common-api-details.md#error-response-content + """ + + def __init__(self, json_object: Mapping[str, Any]) -> None: + # Parse the ODatav4 part + super(ARMErrorFormat, self).__init__(json_object) + if "error" in json_object: + json_object = json_object["error"] + + # ARM specific annotations + self.additional_info: Sequence[TypedErrorInfo] = [ + TypedErrorInfo(additional_info["type"], additional_info["info"]) + for additional_info in json_object.get("additionalInfo", []) + ] + + def __str__(self) -> str: + error_str = super(ARMErrorFormat, self).__str__() + + if self.additional_info: + error_str += "\nAdditional Information:" + for error_info in self.additional_info: + error_str += str(error_info) + + return error_str diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py new file mode 100644 index 00000000..e79b73f4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py @@ -0,0 +1,69 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- + +from azure.core.pipeline.policies import HttpLoggingPolicy +from ._authentication import ( + ARMChallengeAuthenticationPolicy, + AuxiliaryAuthenticationPolicy, +) +from ._base import ARMAutoResourceProviderRegistrationPolicy +from ._authentication_async import ( + AsyncARMChallengeAuthenticationPolicy, + AsyncAuxiliaryAuthenticationPolicy, +) +from ._base_async import AsyncARMAutoResourceProviderRegistrationPolicy + + +class ARMHttpLoggingPolicy(HttpLoggingPolicy): + """HttpLoggingPolicy with ARM specific safe headers fopr loggers.""" + + DEFAULT_HEADERS_ALLOWLIST = HttpLoggingPolicy.DEFAULT_HEADERS_ALLOWLIST | set( + [ + # https://docs.microsoft.com/azure/azure-resource-manager/management/request-limits-and-throttling#remaining-requests + "x-ms-ratelimit-remaining-subscription-reads", + "x-ms-ratelimit-remaining-subscription-writes", + "x-ms-ratelimit-remaining-tenant-reads", + "x-ms-ratelimit-remaining-tenant-writes", + "x-ms-ratelimit-remaining-subscription-resource-requests", + "x-ms-ratelimit-remaining-subscription-resource-entities-read", + "x-ms-ratelimit-remaining-tenant-resource-requests", + "x-ms-ratelimit-remaining-tenant-resource-entities-read", + # https://docs.microsoft.com/azure/virtual-machines/troubleshooting/troubleshooting-throttling-errors#call-rate-informational-response-headers + "x-ms-ratelimit-remaining-resource", + "x-ms-request-charge", + ] + ) + + +__all__ = [ + "ARMAutoResourceProviderRegistrationPolicy", + "ARMChallengeAuthenticationPolicy", + "ARMHttpLoggingPolicy", + "AsyncARMAutoResourceProviderRegistrationPolicy", + "AsyncARMChallengeAuthenticationPolicy", + "AuxiliaryAuthenticationPolicy", + "AsyncAuxiliaryAuthenticationPolicy", +] diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py new file mode 100644 index 00000000..c7015b81 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py @@ -0,0 +1,182 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import base64 +import time +from typing import Optional, Union, MutableMapping, List, Any, Sequence, TypeVar, Generic + +from azure.core.credentials import AccessToken, TokenCredential +from azure.core.credentials_async import AsyncTokenCredential +from azure.core.pipeline.policies import BearerTokenCredentialPolicy, SansIOHTTPPolicy +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.exceptions import ServiceRequestError +from azure.core.pipeline.transport import ( + HttpRequest as LegacyHttpRequest, + HttpResponse as LegacyHttpResponse, +) +from azure.core.rest import HttpRequest, HttpResponse + + +HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] +HTTPResponseType = Union[LegacyHttpResponse, HttpResponse] +TokenCredentialType = TypeVar("TokenCredentialType", bound=Union[TokenCredential, AsyncTokenCredential]) + + +class ARMChallengeAuthenticationPolicy(BearerTokenCredentialPolicy): + """Adds a bearer token Authorization header to requests. + + This policy internally handles Continuous Access Evaluation (CAE) challenges. When it can't complete a challenge, + it will return the 401 (unauthorized) response from ARM. + + :param ~azure.core.credentials.TokenCredential credential: credential for authorizing requests + :param str scopes: required authentication scopes + """ + + def on_challenge( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, HTTPResponseType], + ) -> bool: + """Authorize request according to an ARM authentication challenge + + :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge + :param ~azure.core.pipeline.PipelineResponse response: ARM's response + :returns: a bool indicating whether the policy should send the request + :rtype: bool + """ + + challenge = response.http_response.headers.get("WWW-Authenticate") + if challenge: + claims = _parse_claims_challenge(challenge) + if claims: + self.authorize_request(request, *self._scopes, claims=claims) + return True + + return False + + +# pylint:disable=too-few-public-methods +class _AuxiliaryAuthenticationPolicyBase(Generic[TokenCredentialType]): + """Adds auxiliary authorization token header to requests. + + :param ~azure.core.credentials.TokenCredential auxiliary_credentials: auxiliary credential for authorizing requests + :param str scopes: required authentication scopes + """ + + def __init__( # pylint: disable=unused-argument + self, auxiliary_credentials: Sequence[TokenCredentialType], *scopes: str, **kwargs: Any + ) -> None: + self._auxiliary_credentials = auxiliary_credentials + self._scopes = scopes + self._aux_tokens: Optional[List[AccessToken]] = None + + @staticmethod + def _enforce_https(request: PipelineRequest[HTTPRequestType]) -> None: + # move 'enforce_https' from options to context, so it persists + # across retries but isn't passed to transport implementation + option = request.context.options.pop("enforce_https", None) + + # True is the default setting; we needn't preserve an explicit opt in to the default behavior + if option is False: + request.context["enforce_https"] = option + + enforce_https = request.context.get("enforce_https", True) + if enforce_https and not request.http_request.url.lower().startswith("https"): + raise ServiceRequestError( + "Bearer token authentication is not permitted for non-TLS protected (non-https) URLs." + ) + + def _update_headers(self, headers: MutableMapping[str, str]) -> None: + """Updates the x-ms-authorization-auxiliary header with the auxiliary token. + + :param dict headers: The HTTP Request headers + """ + if self._aux_tokens: + headers["x-ms-authorization-auxiliary"] = ", ".join( + "Bearer {}".format(token.token) for token in self._aux_tokens + ) + + @property + def _need_new_aux_tokens(self) -> bool: + if not self._aux_tokens: + return True + for token in self._aux_tokens: + if token.expires_on - time.time() < 300: + return True + return False + + +class AuxiliaryAuthenticationPolicy( + _AuxiliaryAuthenticationPolicyBase[TokenCredential], + SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType], +): + def _get_auxiliary_tokens(self, *scopes: str, **kwargs: Any) -> Optional[List[AccessToken]]: + if self._auxiliary_credentials: + return [cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials] + return None + + def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Called before the policy sends a request. + + The base implementation authorizes the request with an auxiliary authorization token. + + :param ~azure.core.pipeline.PipelineRequest request: the request + """ + self._enforce_https(request) + + if self._need_new_aux_tokens: + self._aux_tokens = self._get_auxiliary_tokens(*self._scopes) + + self._update_headers(request.http_request.headers) + + +def _parse_claims_challenge(challenge: str) -> Optional[str]: + """Parse the "claims" parameter from an authentication challenge + + Example challenge with claims: + Bearer authorization_uri="https://login.windows-ppe.net/", error="invalid_token", + error_description="User session has been revoked", + claims="eyJhY2Nlc3NfdG9rZW4iOnsibmJmIjp7ImVzc2VudGlhbCI6dHJ1ZSwgInZhbHVlIjoiMTYwMzc0MjgwMCJ9fX0=" + + :param str challenge: The authentication challenge + :return: the challenge's "claims" parameter or None, if it doesn't contain that parameter + """ + encoded_claims = None + for parameter in challenge.split(","): + if "claims=" in parameter: + if encoded_claims: + # multiple claims challenges, e.g. for cross-tenant auth, would require special handling + return None + encoded_claims = parameter[parameter.index("=") + 1 :].strip(" \"'") + + if not encoded_claims: + return None + + padding_needed = -len(encoded_claims) % 4 + try: + decoded_claims = base64.urlsafe_b64decode(encoded_claims + "=" * padding_needed).decode() + return decoded_claims + except Exception: # pylint:disable=broad-except + return None diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py new file mode 100644 index 00000000..bd93b112 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py @@ -0,0 +1,162 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import cast, Awaitable, Optional, List, Union, Any +import inspect + +from azure.core.pipeline.policies import ( + AsyncBearerTokenCredentialPolicy, + AsyncHTTPPolicy, +) +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.transport import ( + HttpRequest as LegacyHttpRequest, + AsyncHttpResponse as LegacyAsyncHttpResponse, +) +from azure.core.rest import HttpRequest, AsyncHttpResponse +from azure.core.credentials import AccessToken +from azure.core.credentials_async import AsyncTokenCredential + + +from ._authentication import _parse_claims_challenge, _AuxiliaryAuthenticationPolicyBase + + +HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] +AsyncHTTPResponseType = Union[LegacyAsyncHttpResponse, AsyncHttpResponse] + + +async def await_result(func, *args, **kwargs): + """If func returns an awaitable, await it. + + :param callable func: Function to call + :param any args: Positional arguments to pass to func + :return: Result of func + :rtype: any + """ + result = func(*args, **kwargs) + if inspect.isawaitable(result): + return await result + return result + + +class AsyncARMChallengeAuthenticationPolicy(AsyncBearerTokenCredentialPolicy): + """Adds a bearer token Authorization header to requests. + + This policy internally handles Continuous Access Evaluation (CAE) challenges. When it can't complete a challenge, + it will return the 401 (unauthorized) response from ARM. + + :param ~azure.core.credentials.TokenCredential credential: credential for authorizing requests + :param str scopes: required authentication scopes + """ + + # pylint:disable=unused-argument + async def on_challenge( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], + ) -> bool: + """Authorize request according to an ARM authentication challenge + + :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge + :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response + :returns: a bool indicating whether the policy should send the request + :rtype: bool + """ + # Casting, as the code seems to be certain that on_challenge this header will be present + challenge: str = cast(str, response.http_response.headers.get("WWW-Authenticate")) + claims = _parse_claims_challenge(challenge) + if claims: + await self.authorize_request(request, *self._scopes, claims=claims) + return True + + return False + + +class AsyncAuxiliaryAuthenticationPolicy( + _AuxiliaryAuthenticationPolicyBase[AsyncTokenCredential], + AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType], +): + async def _get_auxiliary_tokens(self, *scopes: str, **kwargs: Any) -> Optional[List[AccessToken]]: + if self._auxiliary_credentials: + return [await cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials] + return None + + async def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Called before the policy sends a request. + + The base implementation authorizes the request with an auxiliary authorization token. + + :param ~azure.core.pipeline.PipelineRequest request: the request + """ + self._enforce_https(request) + + if self._need_new_aux_tokens: + self._aux_tokens = await self._get_auxiliary_tokens(*self._scopes) + + self._update_headers(request.http_request.headers) + + def on_response( + self, + request: PipelineRequest[HTTPRequestType], + response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], + ) -> Optional[Awaitable[None]]: + """Executed after the request comes back from the next policy. + + :param request: Request to be modified after returning from the policy. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: Pipeline response object + :type response: ~azure.core.pipeline.PipelineResponse + """ + + def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None: + """Executed when an exception is raised while executing the next policy. + + This method is executed inside the exception handler. + + :param request: The Pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + """ + # pylint: disable=unused-argument + return + + async def send( + self, request: PipelineRequest[HTTPRequestType] + ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: + """Authorize request with a bearer token and send it to the next policy + + :param request: The pipeline request object + :type request: ~azure.core.pipeline.PipelineRequest + :return: The pipeline response object + :rtype: ~azure.core.pipeline.PipelineResponse + """ + await await_result(self.on_request, request) + try: + response = await self.next.send(request) + await await_result(self.on_response, request, response) + except Exception: # pylint:disable=broad-except + handled = await await_result(self.on_exception, request) + if not handled: + raise + return response diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py new file mode 100644 index 00000000..188d6cfc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py @@ -0,0 +1,138 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import json +import logging +import re +import time +import uuid +from typing import Union, Optional, cast + +from azure.core.pipeline import PipelineContext, PipelineRequest, PipelineResponse +from azure.core.pipeline.policies import HTTPPolicy +from azure.core.pipeline.transport import ( + HttpRequest as LegacyHttpRequest, + HttpResponse as LegacyHttpResponse, + AsyncHttpResponse as LegacyAsyncHttpResponse, +) +from azure.core.rest import HttpRequest, HttpResponse, AsyncHttpResponse + + +_LOGGER = logging.getLogger(__name__) + +HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] +HTTPResponseType = Union[LegacyHttpResponse, HttpResponse] +AllHttpResponseType = Union[ + LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse +] # Sync or async + + +class _SansIOARMAutoResourceProviderRegistrationPolicy: + @staticmethod + def _check_rp_not_registered_err(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[str]: + try: + response_as_json = json.loads(response.http_response.text()) + if response_as_json["error"]["code"] == "MissingSubscriptionRegistration": + # While "match" can in theory be None, if we saw "MissingSubscriptionRegistration" it won't happen + match = cast(re.Match, re.match(r".*'(.*)'", response_as_json["error"]["message"])) + return match.group(1) + except Exception: # pylint: disable=broad-except + pass + return None + + @staticmethod + def _extract_subscription_url(url: str) -> str: + """Extract the first part of the URL, just after subscription: + https://management.azure.com/subscriptions/00000000-0000-0000-0000-000000000000/ + + :param str url: The URL to extract the subscription ID from + :return: The subscription ID + :rtype: str + """ + match = re.match(r".*/subscriptions/[a-f0-9-]+/", url, re.IGNORECASE) + if not match: + raise ValueError("Unable to extract subscription ID from URL") + return match.group(0) + + @staticmethod + def _build_next_request( + initial_request: PipelineRequest[HTTPRequestType], method: str, url: str + ) -> PipelineRequest[HTTPRequestType]: + request = HttpRequest(method, url) + context = PipelineContext(initial_request.context.transport, **initial_request.context.options) + return PipelineRequest(request, context) + + +class ARMAutoResourceProviderRegistrationPolicy( + _SansIOARMAutoResourceProviderRegistrationPolicy, HTTPPolicy[HTTPRequestType, HTTPResponseType] +): # pylint: disable=name-too-long + """Auto register an ARM resource provider if not done yet.""" + + def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: + http_request = request.http_request + response = self.next.send(request) + if response.http_response.status_code == 409: + rp_name = self._check_rp_not_registered_err(response) + if rp_name: + url_prefix = self._extract_subscription_url(http_request.url) + if not self._register_rp(request, url_prefix, rp_name): + return response + # Change the 'x-ms-client-request-id' otherwise the Azure endpoint + # just returns the same 409 payload without looking at the actual query + if "x-ms-client-request-id" in http_request.headers: + http_request.headers["x-ms-client-request-id"] = str(uuid.uuid4()) + response = self.next.send(request) + return response + + def _register_rp(self, initial_request: PipelineRequest[HTTPRequestType], url_prefix: str, rp_name: str) -> bool: + """Synchronously register the RP is paremeter. + + Return False if we have a reason to believe this didn't work + + :param initial_request: The initial request + :type initial_request: ~azure.core.pipeline.PipelineRequest + :param str url_prefix: The url prefix + :param str rp_name: The resource provider name + :return: Return False if we have a reason to believe this didn't work + :rtype: bool + """ + post_url = "{}providers/{}/register?api-version=2016-02-01".format(url_prefix, rp_name) + get_url = "{}providers/{}?api-version=2016-02-01".format(url_prefix, rp_name) + _LOGGER.warning( + "Resource provider '%s' used by this operation is not registered. We are registering for you.", + rp_name, + ) + post_response = self.next.send(self._build_next_request(initial_request, "POST", post_url)) + if post_response.http_response.status_code != 200: + _LOGGER.warning("Registration failed. Please register manually.") + return False + + while True: + time.sleep(10) + get_response = self.next.send(self._build_next_request(initial_request, "GET", get_url)) + rp_info = json.loads(get_response.http_response.text()) + if rp_info["registrationState"] == "Registered": + _LOGGER.warning("Registration succeeded.") + return True diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py new file mode 100644 index 00000000..8ea7d397 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py @@ -0,0 +1,105 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import asyncio +import json +import logging +import uuid +from typing import Union + +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.policies import AsyncHTTPPolicy +from azure.core.pipeline.transport import ( + HttpRequest as LegacyHttpRequest, + AsyncHttpResponse as LegacyAsyncHttpResponse, +) +from azure.core.rest import HttpRequest, AsyncHttpResponse + + +from ._base import _SansIOARMAutoResourceProviderRegistrationPolicy + +_LOGGER = logging.getLogger(__name__) + +HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] +AsyncHTTPResponseType = Union[LegacyAsyncHttpResponse, AsyncHttpResponse] +PipelineResponseType = PipelineResponse[HTTPRequestType, AsyncHTTPResponseType] + + +class AsyncARMAutoResourceProviderRegistrationPolicy( + _SansIOARMAutoResourceProviderRegistrationPolicy, AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType] +): # pylint: disable=name-too-long + """Auto register an ARM resource provider if not done yet.""" + + async def send( # pylint: disable=invalid-overridden-method + self, request: PipelineRequest[HTTPRequestType] + ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: + http_request = request.http_request + response = await self.next.send(request) + if response.http_response.status_code == 409: + rp_name = self._check_rp_not_registered_err(response) + if rp_name: + url_prefix = self._extract_subscription_url(http_request.url) + register_rp_status = await self._async_register_rp(request, url_prefix, rp_name) + if not register_rp_status: + return response + # Change the 'x-ms-client-request-id' otherwise the Azure endpoint + # just returns the same 409 payload without looking at the actual query + if "x-ms-client-request-id" in http_request.headers: + http_request.headers["x-ms-client-request-id"] = str(uuid.uuid4()) + response = await self.next.send(request) + return response + + async def _async_register_rp( + self, initial_request: PipelineRequest[HTTPRequestType], url_prefix: str, rp_name: str + ) -> bool: + """Synchronously register the RP is paremeter. + + Return False if we have a reason to believe this didn't work + + :param initial_request: The initial request + :type initial_request: ~azure.core.pipeline.PipelineRequest + :param str url_prefix: The url prefix + :param str rp_name: The resource provider name + :return: Return False if we have a reason to believe this didn't work + :rtype: bool + """ + post_url = "{}providers/{}/register?api-version=2016-02-01".format(url_prefix, rp_name) + get_url = "{}providers/{}?api-version=2016-02-01".format(url_prefix, rp_name) + _LOGGER.warning( + "Resource provider '%s' used by this operation is not registered. We are registering for you.", + rp_name, + ) + post_response = await self.next.send(self._build_next_request(initial_request, "POST", post_url)) + if post_response.http_response.status_code != 200: + _LOGGER.warning("Registration failed. Please register manually.") + return False + + while True: + await asyncio.sleep(10) + get_response = await self.next.send(self._build_next_request(initial_request, "GET", get_url)) + rp_info = json.loads(get_response.http_response.text()) + if rp_info["registrationState"] == "Registered": + _LOGGER.warning("Registration succeeded.") + return True diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/__init__.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/__init__.py new file mode 100644 index 00000000..c36aaed1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/__init__.py @@ -0,0 +1,25 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/arm_polling.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/arm_polling.py new file mode 100644 index 00000000..a1784461 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/arm_polling.py @@ -0,0 +1,216 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from enum import Enum +from typing import Optional, Union, TypeVar, Dict, Any, Sequence + +from azure.core import CaseInsensitiveEnumMeta +from azure.core.polling.base_polling import ( + LongRunningOperation, + LROBasePolling, + OperationFailed, + BadResponse, + OperationResourcePolling, + LocationPolling, + StatusCheckPolling, + _as_json, + _is_empty, +) + +from azure.core.pipeline import PipelineResponse +from azure.core.pipeline.transport import ( + HttpRequest as LegacyHttpRequest, + HttpResponse as LegacyHttpResponse, + AsyncHttpResponse as LegacyAsyncHttpResponse, +) +from azure.core.rest import HttpRequest, HttpResponse, AsyncHttpResponse + +ResponseType = Union[HttpResponse, AsyncHttpResponse] +PipelineResponseType = PipelineResponse[HttpRequest, ResponseType] +HttpRequestType = Union[LegacyHttpRequest, HttpRequest] +AllHttpResponseType = Union[ + LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse +] # Sync or async +HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType) +AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async + + +class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): + """Known LRO options from Swagger.""" + + FINAL_STATE_VIA = "final-state-via" + + +class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): + """Possible final-state-via options.""" + + AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation" + LOCATION_FINAL_STATE = "location" + + +class AzureAsyncOperationPolling(OperationResourcePolling[HttpRequestTypeVar, AllHttpResponseTypeVar]): + """Implements a operation resource polling, typically from Azure-AsyncOperation.""" + + def __init__(self, lro_options: Optional[Dict[str, Any]] = None) -> None: + super(AzureAsyncOperationPolling, self).__init__(operation_location_header="azure-asyncoperation") + + self._lro_options = lro_options or {} + + def get_final_get_url( + self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] + ) -> Optional[str]: + """If a final GET is needed, returns the URL. + + :param ~azure.core.pipeline.PipelineResponse pipeline_response: The pipeline response object. + :return: The URL to poll for the final GET. + :rtype: str + """ + if ( + self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE + and self._request.method == "POST" + ): + return None + return super(AzureAsyncOperationPolling, self).get_final_get_url(pipeline_response) + + +class BodyContentPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): + """Poll based on the body content. + + Implement a ARM resource poller (using provisioning state). + """ + + _initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] + """Store the initial response.""" + + def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool: + """Answer if this polling method could be used. + + :param ~azure.core.pipeline.PipelineResponse pipeline_response: The pipeline response object. + :return: True if this polling method could be used. + :rtype: bool + """ + response = pipeline_response.http_response + return response.request.method in ["PUT", "PATCH"] + + def get_polling_url(self) -> str: + """Return the polling URL. + :return: The polling URL. + :rtype: str + """ + return self._initial_response.http_response.request.url + + def get_final_get_url(self, pipeline_response: Any) -> None: + """If a final GET is needed, returns the URL. + + :param ~azure.core.pipeline.PipelineResponse pipeline_response: The pipeline response object. + :return: The URL to poll for the final GET. + :rtype: str + """ + return None + + def set_initial_status( + self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] + ) -> str: + """Process first response after initiating long running operation. + + :param ~azure.core.pipeline.PipelineResponse pipeline_response: initial REST call response. + :return: Status string. + :rtype: str + """ + self._initial_response = pipeline_response + response = pipeline_response.http_response + + if response.status_code == 202: + return "InProgress" + if response.status_code == 201: + status = self._get_provisioning_state(response) + return status or "InProgress" + if response.status_code == 200: + status = self._get_provisioning_state(response) + return status or "Succeeded" + if response.status_code == 204: + return "Succeeded" + + raise OperationFailed("Invalid status found") + + @staticmethod + def _get_provisioning_state(response: AllHttpResponseTypeVar) -> Optional[str]: + """Attempt to get provisioning state from resource. + + :param azure.core.pipeline.transport.HttpResponse response: latest REST call response. + :returns: Status if found, else 'None'. + :rtype: str or None + """ + if _is_empty(response): + return None + body = _as_json(response) + return body.get("properties", {}).get("provisioningState") + + def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str: + """Process the latest status update retrieved from the same URL as + the previous request. + + :param ~azure.core.pipeline.PipelineResponse pipeline_response: latest REST call response. + :return: Status string. + :rtype: str + :raises: BadResponse if status not 200 or 204. + """ + response = pipeline_response.http_response + if _is_empty(response): + raise BadResponse("The response from long running operation does not contain a body.") + + status = self._get_provisioning_state(response) + return status or "Succeeded" + + +class ARMPolling(LROBasePolling): + def __init__( + self, + timeout: float = 30, + lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None, + lro_options: Optional[Dict[str, Any]] = None, + path_format_arguments: Optional[Dict[str, str]] = None, + **operation_config: Any + ) -> None: + lro_algorithms = lro_algorithms or [ + AzureAsyncOperationPolling(lro_options=lro_options), + LocationPolling(), + BodyContentPolling(), + StatusCheckPolling(), + ] + super(ARMPolling, self).__init__( + timeout=timeout, + lro_algorithms=lro_algorithms, + lro_options=lro_options, + path_format_arguments=path_format_arguments, + **operation_config + ) + + +__all__ = [ + "AzureAsyncOperationPolling", + "BodyContentPolling", + "ARMPolling", +] diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/async_arm_polling.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/async_arm_polling.py new file mode 100644 index 00000000..38c12387 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/polling/async_arm_polling.py @@ -0,0 +1,58 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import Optional, Dict, Any, Sequence + +from azure.core.polling.base_polling import LocationPolling, StatusCheckPolling, LongRunningOperation +from azure.core.polling.async_base_polling import AsyncLROBasePolling + +from .arm_polling import AzureAsyncOperationPolling, BodyContentPolling, HttpRequestTypeVar, AllHttpResponseTypeVar + + +class AsyncARMPolling(AsyncLROBasePolling): + def __init__( + self, + timeout: float = 30, + lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None, + lro_options: Optional[Dict[str, Any]] = None, + path_format_arguments: Optional[Dict[str, str]] = None, + **operation_config: Any + ) -> None: + lro_algorithms = lro_algorithms or [ + AzureAsyncOperationPolling(lro_options=lro_options), + LocationPolling(), + BodyContentPolling(), + StatusCheckPolling(), + ] + super(AsyncLROBasePolling, self).__init__( + timeout=timeout, + lro_algorithms=lro_algorithms, + lro_options=lro_options, + path_format_arguments=path_format_arguments, + **operation_config + ) + + +__all__ = ["AsyncARMPolling"] diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/py.typed b/.venv/lib/python3.12/site-packages/azure/mgmt/core/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/py.typed diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/tools.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/tools.py new file mode 100644 index 00000000..aee742fe --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/tools.py @@ -0,0 +1,247 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import Mapping, MutableMapping, Optional, Type, Union, cast, Dict, Any +import re +import logging +from azure.core import AzureClouds + + +_LOGGER = logging.getLogger(__name__) +_ARMID_RE = re.compile( + "(?i)/subscriptions/(?P<subscription>[^/]+)(/resourceGroups/(?P<resource_group>[^/]+))?" + + "(/providers/(?P<namespace>[^/]+)/(?P<type>[^/]*)/(?P<name>[^/]+)(?P<children>.*))?" +) + +_CHILDREN_RE = re.compile( + "(?i)(/providers/(?P<child_namespace>[^/]+))?/" + "(?P<child_type>[^/]*)/(?P<child_name>[^/]+)" +) + +_ARMNAME_RE = re.compile("^[^<>%&:\\?/]{1,260}$") + + +__all__ = [ + "parse_resource_id", + "resource_id", + "is_valid_resource_id", + "is_valid_resource_name", + "get_arm_endpoints", +] + + +def parse_resource_id(rid: str) -> Mapping[str, Union[str, int]]: + """Parses a resource_id into its various parts. + + Returns a dictionary with a single key-value pair, 'name': rid, if invalid resource id. + + :param rid: The resource id being parsed + :type rid: str + :returns: A dictionary with with following key/value pairs (if found): + + - subscription: Subscription id + - resource_group: Name of resource group + - namespace: Namespace for the resource provider (i.e. Microsoft.Compute) + - type: Type of the root resource (i.e. virtualMachines) + - name: Name of the root resource + - child_namespace_{level}: Namespace for the child resource of that level + - child_type_{level}: Type of the child resource of that level + - child_name_{level}: Name of the child resource of that level + - last_child_num: Level of the last child + - resource_parent: Computed parent in the following pattern: providers/{namespace}\ + /{parent}/{type}/{name} + - resource_namespace: Same as namespace. Note that this may be different than the \ + target resource's namespace. + - resource_type: Type of the target resource (not the parent) + - resource_name: Name of the target resource (not the parent) + + :rtype: dict[str,str] + """ + if not rid: + return {} + match = _ARMID_RE.match(rid) + if match: + result: MutableMapping[str, Union[None, str, int]] = match.groupdict() + children = _CHILDREN_RE.finditer(cast(Optional[str], result["children"]) or "") + count = None + for count, child in enumerate(children): + result.update({key + "_%d" % (count + 1): group for key, group in child.groupdict().items()}) + result["last_child_num"] = count + 1 if isinstance(count, int) else None + final_result = _populate_alternate_kwargs(result) + else: + final_result = result = {"name": rid} + return {key: value for key, value in final_result.items() if value is not None} + + +def _populate_alternate_kwargs( + kwargs: MutableMapping[str, Union[None, str, int]] +) -> Mapping[str, Union[None, str, int]]: + """Translates the parsed arguments into a format used by generic ARM commands + such as the resource and lock commands. + + :param any kwargs: The parsed arguments + :return: The translated arguments + :rtype: any + """ + + resource_namespace = kwargs["namespace"] + resource_type = kwargs.get("child_type_{}".format(kwargs["last_child_num"])) or kwargs["type"] + resource_name = kwargs.get("child_name_{}".format(kwargs["last_child_num"])) or kwargs["name"] + + _get_parents_from_parts(kwargs) + kwargs["resource_namespace"] = resource_namespace + kwargs["resource_type"] = resource_type + kwargs["resource_name"] = resource_name + return kwargs + + +def _get_parents_from_parts(kwargs: MutableMapping[str, Union[None, str, int]]) -> Mapping[str, Union[None, str, int]]: + """Get the parents given all the children parameters. + + :param any kwargs: The children parameters + :return: The parents + :rtype: any + """ + parent_builder = [] + if kwargs["last_child_num"] is not None: + parent_builder.append("{type}/{name}/".format(**kwargs)) + for index in range(1, cast(int, kwargs["last_child_num"])): + child_namespace = kwargs.get("child_namespace_{}".format(index)) + if child_namespace is not None: + parent_builder.append("providers/{}/".format(child_namespace)) + kwargs["child_parent_{}".format(index)] = "".join(parent_builder) + parent_builder.append("{{child_type_{0}}}/{{child_name_{0}}}/".format(index).format(**kwargs)) + child_namespace = kwargs.get("child_namespace_{}".format(kwargs["last_child_num"])) + if child_namespace is not None: + parent_builder.append("providers/{}/".format(child_namespace)) + kwargs["child_parent_{}".format(kwargs["last_child_num"])] = "".join(parent_builder) + kwargs["resource_parent"] = "".join(parent_builder) if kwargs["name"] else None + return kwargs + + +def resource_id(**kwargs: Optional[str]) -> str: # pylint: disable=docstring-keyword-should-match-keyword-only + """Create a valid resource id string from the given parts. + + This method builds the resource id from the left until the next required id parameter + to be appended is not found. It then returns the built up id. + + :keyword str subscription: (required) Subscription id + :keyword str resource_group: Name of resource group + :keyword str namespace: Namespace for the resource provider (i.e. Microsoft.Compute) + :keyword str type: Type of the resource (i.e. virtualMachines) + :keyword str name: Name of the resource (or parent if child_name is also specified) + :keyword str child_namespace_{level}: Namespace for the child resource of that level (optional) + :keyword str child_type_{level}: Type of the child resource of that level + :keyword str child_name_{level}: Name of the child resource of that level + + :returns: A resource id built from the given arguments. + :rtype: str + """ + kwargs = {k: v for k, v in kwargs.items() if v is not None} + rid_builder = ["/subscriptions/{subscription}".format(**kwargs)] + try: + try: + rid_builder.append("resourceGroups/{resource_group}".format(**kwargs)) + except KeyError: + pass + rid_builder.append("providers/{namespace}".format(**kwargs)) + rid_builder.append("{type}/{name}".format(**kwargs)) + count = 1 + while True: + try: + rid_builder.append("providers/{{child_namespace_{}}}".format(count).format(**kwargs)) + except KeyError: + pass + rid_builder.append("{{child_type_{0}}}/{{child_name_{0}}}".format(count).format(**kwargs)) + count += 1 + except KeyError: + pass + return "/".join(rid_builder) + + +def is_valid_resource_id(rid: str, exception_type: Optional[Type[BaseException]] = None) -> bool: + """Validates the given resource id. + + :param rid: The resource id being validated. + :type rid: str + :param exception_type: Raises this Exception if invalid. + :type exception_type: Exception + :returns: A boolean describing whether the id is valid. + :rtype: bool + """ + is_valid: bool = False + try: + # Ideally, we would make a TypedDict here, but keeping this file simple for now. + is_valid = rid and resource_id(**parse_resource_id(rid)).lower() == rid.lower() # type: ignore + except KeyError: + pass + if not is_valid and exception_type: + raise exception_type() + return is_valid + + +def is_valid_resource_name(rname: str, exception_type: Optional[Type[BaseException]] = None) -> bool: + """Validates the given resource name to ARM guidelines, individual services may be more restrictive. + + :param rname: The resource name being validated. + :type rname: str + :param exception_type: Raises this Exception if invalid. + :type exception_type: Exception + :returns: A boolean describing whether the name is valid. + :rtype: bool + """ + + match = _ARMNAME_RE.match(rname) + + if match: + return True + if exception_type: + raise exception_type() + return False + + +def get_arm_endpoints(cloud_setting: AzureClouds) -> Dict[str, Any]: + """Get the ARM endpoint and ARM credential scopes for the given cloud setting. + + :param cloud_setting: The cloud setting for which to get the ARM endpoint. + :type cloud_setting: AzureClouds + :return: The ARM endpoint and ARM credential scopes. + :rtype: dict[str, Any] + """ + if cloud_setting == AzureClouds.AZURE_CHINA_CLOUD: + return { + "resource_manager": "https://management.chinacloudapi.cn/", + "credential_scopes": ["https://management.chinacloudapi.cn/.default"], + } + if cloud_setting == AzureClouds.AZURE_US_GOVERNMENT: + return { + "resource_manager": "https://management.usgovcloudapi.net/", + "credential_scopes": ["https://management.core.usgovcloudapi.net/.default"], + } + if cloud_setting == AzureClouds.AZURE_PUBLIC_CLOUD: + return { + "resource_manager": "https://management.azure.com/", + "credential_scopes": ["https://management.azure.com/.default"], + } + raise ValueError("Unknown cloud setting: {}".format(cloud_setting)) |