aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/mgmt/core/policies')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py69
-rw-r--r--.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py182
-rw-r--r--.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py162
-rw-r--r--.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py138
-rw-r--r--.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py105
5 files changed, 656 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py
new file mode 100644
index 00000000..e79b73f4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/__init__.py
@@ -0,0 +1,69 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+
+from azure.core.pipeline.policies import HttpLoggingPolicy
+from ._authentication import (
+ ARMChallengeAuthenticationPolicy,
+ AuxiliaryAuthenticationPolicy,
+)
+from ._base import ARMAutoResourceProviderRegistrationPolicy
+from ._authentication_async import (
+ AsyncARMChallengeAuthenticationPolicy,
+ AsyncAuxiliaryAuthenticationPolicy,
+)
+from ._base_async import AsyncARMAutoResourceProviderRegistrationPolicy
+
+
+class ARMHttpLoggingPolicy(HttpLoggingPolicy):
+ """HttpLoggingPolicy with ARM specific safe headers fopr loggers."""
+
+ DEFAULT_HEADERS_ALLOWLIST = HttpLoggingPolicy.DEFAULT_HEADERS_ALLOWLIST | set(
+ [
+ # https://docs.microsoft.com/azure/azure-resource-manager/management/request-limits-and-throttling#remaining-requests
+ "x-ms-ratelimit-remaining-subscription-reads",
+ "x-ms-ratelimit-remaining-subscription-writes",
+ "x-ms-ratelimit-remaining-tenant-reads",
+ "x-ms-ratelimit-remaining-tenant-writes",
+ "x-ms-ratelimit-remaining-subscription-resource-requests",
+ "x-ms-ratelimit-remaining-subscription-resource-entities-read",
+ "x-ms-ratelimit-remaining-tenant-resource-requests",
+ "x-ms-ratelimit-remaining-tenant-resource-entities-read",
+ # https://docs.microsoft.com/azure/virtual-machines/troubleshooting/troubleshooting-throttling-errors#call-rate-informational-response-headers
+ "x-ms-ratelimit-remaining-resource",
+ "x-ms-request-charge",
+ ]
+ )
+
+
+__all__ = [
+ "ARMAutoResourceProviderRegistrationPolicy",
+ "ARMChallengeAuthenticationPolicy",
+ "ARMHttpLoggingPolicy",
+ "AsyncARMAutoResourceProviderRegistrationPolicy",
+ "AsyncARMChallengeAuthenticationPolicy",
+ "AuxiliaryAuthenticationPolicy",
+ "AsyncAuxiliaryAuthenticationPolicy",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py
new file mode 100644
index 00000000..c7015b81
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication.py
@@ -0,0 +1,182 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import base64
+import time
+from typing import Optional, Union, MutableMapping, List, Any, Sequence, TypeVar, Generic
+
+from azure.core.credentials import AccessToken, TokenCredential
+from azure.core.credentials_async import AsyncTokenCredential
+from azure.core.pipeline.policies import BearerTokenCredentialPolicy, SansIOHTTPPolicy
+from azure.core.pipeline import PipelineRequest, PipelineResponse
+from azure.core.exceptions import ServiceRequestError
+from azure.core.pipeline.transport import (
+ HttpRequest as LegacyHttpRequest,
+ HttpResponse as LegacyHttpResponse,
+)
+from azure.core.rest import HttpRequest, HttpResponse
+
+
+HTTPRequestType = Union[LegacyHttpRequest, HttpRequest]
+HTTPResponseType = Union[LegacyHttpResponse, HttpResponse]
+TokenCredentialType = TypeVar("TokenCredentialType", bound=Union[TokenCredential, AsyncTokenCredential])
+
+
+class ARMChallengeAuthenticationPolicy(BearerTokenCredentialPolicy):
+ """Adds a bearer token Authorization header to requests.
+
+ This policy internally handles Continuous Access Evaluation (CAE) challenges. When it can't complete a challenge,
+ it will return the 401 (unauthorized) response from ARM.
+
+ :param ~azure.core.credentials.TokenCredential credential: credential for authorizing requests
+ :param str scopes: required authentication scopes
+ """
+
+ def on_challenge(
+ self,
+ request: PipelineRequest[HTTPRequestType],
+ response: PipelineResponse[HTTPRequestType, HTTPResponseType],
+ ) -> bool:
+ """Authorize request according to an ARM authentication challenge
+
+ :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge
+ :param ~azure.core.pipeline.PipelineResponse response: ARM's response
+ :returns: a bool indicating whether the policy should send the request
+ :rtype: bool
+ """
+
+ challenge = response.http_response.headers.get("WWW-Authenticate")
+ if challenge:
+ claims = _parse_claims_challenge(challenge)
+ if claims:
+ self.authorize_request(request, *self._scopes, claims=claims)
+ return True
+
+ return False
+
+
+# pylint:disable=too-few-public-methods
+class _AuxiliaryAuthenticationPolicyBase(Generic[TokenCredentialType]):
+ """Adds auxiliary authorization token header to requests.
+
+ :param ~azure.core.credentials.TokenCredential auxiliary_credentials: auxiliary credential for authorizing requests
+ :param str scopes: required authentication scopes
+ """
+
+ def __init__( # pylint: disable=unused-argument
+ self, auxiliary_credentials: Sequence[TokenCredentialType], *scopes: str, **kwargs: Any
+ ) -> None:
+ self._auxiliary_credentials = auxiliary_credentials
+ self._scopes = scopes
+ self._aux_tokens: Optional[List[AccessToken]] = None
+
+ @staticmethod
+ def _enforce_https(request: PipelineRequest[HTTPRequestType]) -> None:
+ # move 'enforce_https' from options to context, so it persists
+ # across retries but isn't passed to transport implementation
+ option = request.context.options.pop("enforce_https", None)
+
+ # True is the default setting; we needn't preserve an explicit opt in to the default behavior
+ if option is False:
+ request.context["enforce_https"] = option
+
+ enforce_https = request.context.get("enforce_https", True)
+ if enforce_https and not request.http_request.url.lower().startswith("https"):
+ raise ServiceRequestError(
+ "Bearer token authentication is not permitted for non-TLS protected (non-https) URLs."
+ )
+
+ def _update_headers(self, headers: MutableMapping[str, str]) -> None:
+ """Updates the x-ms-authorization-auxiliary header with the auxiliary token.
+
+ :param dict headers: The HTTP Request headers
+ """
+ if self._aux_tokens:
+ headers["x-ms-authorization-auxiliary"] = ", ".join(
+ "Bearer {}".format(token.token) for token in self._aux_tokens
+ )
+
+ @property
+ def _need_new_aux_tokens(self) -> bool:
+ if not self._aux_tokens:
+ return True
+ for token in self._aux_tokens:
+ if token.expires_on - time.time() < 300:
+ return True
+ return False
+
+
+class AuxiliaryAuthenticationPolicy(
+ _AuxiliaryAuthenticationPolicyBase[TokenCredential],
+ SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType],
+):
+ def _get_auxiliary_tokens(self, *scopes: str, **kwargs: Any) -> Optional[List[AccessToken]]:
+ if self._auxiliary_credentials:
+ return [cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials]
+ return None
+
+ def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
+ """Called before the policy sends a request.
+
+ The base implementation authorizes the request with an auxiliary authorization token.
+
+ :param ~azure.core.pipeline.PipelineRequest request: the request
+ """
+ self._enforce_https(request)
+
+ if self._need_new_aux_tokens:
+ self._aux_tokens = self._get_auxiliary_tokens(*self._scopes)
+
+ self._update_headers(request.http_request.headers)
+
+
+def _parse_claims_challenge(challenge: str) -> Optional[str]:
+ """Parse the "claims" parameter from an authentication challenge
+
+ Example challenge with claims:
+ Bearer authorization_uri="https://login.windows-ppe.net/", error="invalid_token",
+ error_description="User session has been revoked",
+ claims="eyJhY2Nlc3NfdG9rZW4iOnsibmJmIjp7ImVzc2VudGlhbCI6dHJ1ZSwgInZhbHVlIjoiMTYwMzc0MjgwMCJ9fX0="
+
+ :param str challenge: The authentication challenge
+ :return: the challenge's "claims" parameter or None, if it doesn't contain that parameter
+ """
+ encoded_claims = None
+ for parameter in challenge.split(","):
+ if "claims=" in parameter:
+ if encoded_claims:
+ # multiple claims challenges, e.g. for cross-tenant auth, would require special handling
+ return None
+ encoded_claims = parameter[parameter.index("=") + 1 :].strip(" \"'")
+
+ if not encoded_claims:
+ return None
+
+ padding_needed = -len(encoded_claims) % 4
+ try:
+ decoded_claims = base64.urlsafe_b64decode(encoded_claims + "=" * padding_needed).decode()
+ return decoded_claims
+ except Exception: # pylint:disable=broad-except
+ return None
diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py
new file mode 100644
index 00000000..bd93b112
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_authentication_async.py
@@ -0,0 +1,162 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from typing import cast, Awaitable, Optional, List, Union, Any
+import inspect
+
+from azure.core.pipeline.policies import (
+ AsyncBearerTokenCredentialPolicy,
+ AsyncHTTPPolicy,
+)
+from azure.core.pipeline import PipelineRequest, PipelineResponse
+from azure.core.pipeline.transport import (
+ HttpRequest as LegacyHttpRequest,
+ AsyncHttpResponse as LegacyAsyncHttpResponse,
+)
+from azure.core.rest import HttpRequest, AsyncHttpResponse
+from azure.core.credentials import AccessToken
+from azure.core.credentials_async import AsyncTokenCredential
+
+
+from ._authentication import _parse_claims_challenge, _AuxiliaryAuthenticationPolicyBase
+
+
+HTTPRequestType = Union[LegacyHttpRequest, HttpRequest]
+AsyncHTTPResponseType = Union[LegacyAsyncHttpResponse, AsyncHttpResponse]
+
+
+async def await_result(func, *args, **kwargs):
+ """If func returns an awaitable, await it.
+
+ :param callable func: Function to call
+ :param any args: Positional arguments to pass to func
+ :return: Result of func
+ :rtype: any
+ """
+ result = func(*args, **kwargs)
+ if inspect.isawaitable(result):
+ return await result
+ return result
+
+
+class AsyncARMChallengeAuthenticationPolicy(AsyncBearerTokenCredentialPolicy):
+ """Adds a bearer token Authorization header to requests.
+
+ This policy internally handles Continuous Access Evaluation (CAE) challenges. When it can't complete a challenge,
+ it will return the 401 (unauthorized) response from ARM.
+
+ :param ~azure.core.credentials.TokenCredential credential: credential for authorizing requests
+ :param str scopes: required authentication scopes
+ """
+
+ # pylint:disable=unused-argument
+ async def on_challenge(
+ self,
+ request: PipelineRequest[HTTPRequestType],
+ response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType],
+ ) -> bool:
+ """Authorize request according to an ARM authentication challenge
+
+ :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge
+ :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response
+ :returns: a bool indicating whether the policy should send the request
+ :rtype: bool
+ """
+ # Casting, as the code seems to be certain that on_challenge this header will be present
+ challenge: str = cast(str, response.http_response.headers.get("WWW-Authenticate"))
+ claims = _parse_claims_challenge(challenge)
+ if claims:
+ await self.authorize_request(request, *self._scopes, claims=claims)
+ return True
+
+ return False
+
+
+class AsyncAuxiliaryAuthenticationPolicy(
+ _AuxiliaryAuthenticationPolicyBase[AsyncTokenCredential],
+ AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType],
+):
+ async def _get_auxiliary_tokens(self, *scopes: str, **kwargs: Any) -> Optional[List[AccessToken]]:
+ if self._auxiliary_credentials:
+ return [await cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials]
+ return None
+
+ async def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
+ """Called before the policy sends a request.
+
+ The base implementation authorizes the request with an auxiliary authorization token.
+
+ :param ~azure.core.pipeline.PipelineRequest request: the request
+ """
+ self._enforce_https(request)
+
+ if self._need_new_aux_tokens:
+ self._aux_tokens = await self._get_auxiliary_tokens(*self._scopes)
+
+ self._update_headers(request.http_request.headers)
+
+ def on_response(
+ self,
+ request: PipelineRequest[HTTPRequestType],
+ response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType],
+ ) -> Optional[Awaitable[None]]:
+ """Executed after the request comes back from the next policy.
+
+ :param request: Request to be modified after returning from the policy.
+ :type request: ~azure.core.pipeline.PipelineRequest
+ :param response: Pipeline response object
+ :type response: ~azure.core.pipeline.PipelineResponse
+ """
+
+ def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None:
+ """Executed when an exception is raised while executing the next policy.
+
+ This method is executed inside the exception handler.
+
+ :param request: The Pipeline request object
+ :type request: ~azure.core.pipeline.PipelineRequest
+ """
+ # pylint: disable=unused-argument
+ return
+
+ async def send(
+ self, request: PipelineRequest[HTTPRequestType]
+ ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]:
+ """Authorize request with a bearer token and send it to the next policy
+
+ :param request: The pipeline request object
+ :type request: ~azure.core.pipeline.PipelineRequest
+ :return: The pipeline response object
+ :rtype: ~azure.core.pipeline.PipelineResponse
+ """
+ await await_result(self.on_request, request)
+ try:
+ response = await self.next.send(request)
+ await await_result(self.on_response, request, response)
+ except Exception: # pylint:disable=broad-except
+ handled = await await_result(self.on_exception, request)
+ if not handled:
+ raise
+ return response
diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py
new file mode 100644
index 00000000..188d6cfc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base.py
@@ -0,0 +1,138 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import json
+import logging
+import re
+import time
+import uuid
+from typing import Union, Optional, cast
+
+from azure.core.pipeline import PipelineContext, PipelineRequest, PipelineResponse
+from azure.core.pipeline.policies import HTTPPolicy
+from azure.core.pipeline.transport import (
+ HttpRequest as LegacyHttpRequest,
+ HttpResponse as LegacyHttpResponse,
+ AsyncHttpResponse as LegacyAsyncHttpResponse,
+)
+from azure.core.rest import HttpRequest, HttpResponse, AsyncHttpResponse
+
+
+_LOGGER = logging.getLogger(__name__)
+
+HTTPRequestType = Union[LegacyHttpRequest, HttpRequest]
+HTTPResponseType = Union[LegacyHttpResponse, HttpResponse]
+AllHttpResponseType = Union[
+ LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
+] # Sync or async
+
+
+class _SansIOARMAutoResourceProviderRegistrationPolicy:
+ @staticmethod
+ def _check_rp_not_registered_err(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[str]:
+ try:
+ response_as_json = json.loads(response.http_response.text())
+ if response_as_json["error"]["code"] == "MissingSubscriptionRegistration":
+ # While "match" can in theory be None, if we saw "MissingSubscriptionRegistration" it won't happen
+ match = cast(re.Match, re.match(r".*'(.*)'", response_as_json["error"]["message"]))
+ return match.group(1)
+ except Exception: # pylint: disable=broad-except
+ pass
+ return None
+
+ @staticmethod
+ def _extract_subscription_url(url: str) -> str:
+ """Extract the first part of the URL, just after subscription:
+ https://management.azure.com/subscriptions/00000000-0000-0000-0000-000000000000/
+
+ :param str url: The URL to extract the subscription ID from
+ :return: The subscription ID
+ :rtype: str
+ """
+ match = re.match(r".*/subscriptions/[a-f0-9-]+/", url, re.IGNORECASE)
+ if not match:
+ raise ValueError("Unable to extract subscription ID from URL")
+ return match.group(0)
+
+ @staticmethod
+ def _build_next_request(
+ initial_request: PipelineRequest[HTTPRequestType], method: str, url: str
+ ) -> PipelineRequest[HTTPRequestType]:
+ request = HttpRequest(method, url)
+ context = PipelineContext(initial_request.context.transport, **initial_request.context.options)
+ return PipelineRequest(request, context)
+
+
+class ARMAutoResourceProviderRegistrationPolicy(
+ _SansIOARMAutoResourceProviderRegistrationPolicy, HTTPPolicy[HTTPRequestType, HTTPResponseType]
+): # pylint: disable=name-too-long
+ """Auto register an ARM resource provider if not done yet."""
+
+ def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
+ http_request = request.http_request
+ response = self.next.send(request)
+ if response.http_response.status_code == 409:
+ rp_name = self._check_rp_not_registered_err(response)
+ if rp_name:
+ url_prefix = self._extract_subscription_url(http_request.url)
+ if not self._register_rp(request, url_prefix, rp_name):
+ return response
+ # Change the 'x-ms-client-request-id' otherwise the Azure endpoint
+ # just returns the same 409 payload without looking at the actual query
+ if "x-ms-client-request-id" in http_request.headers:
+ http_request.headers["x-ms-client-request-id"] = str(uuid.uuid4())
+ response = self.next.send(request)
+ return response
+
+ def _register_rp(self, initial_request: PipelineRequest[HTTPRequestType], url_prefix: str, rp_name: str) -> bool:
+ """Synchronously register the RP is paremeter.
+
+ Return False if we have a reason to believe this didn't work
+
+ :param initial_request: The initial request
+ :type initial_request: ~azure.core.pipeline.PipelineRequest
+ :param str url_prefix: The url prefix
+ :param str rp_name: The resource provider name
+ :return: Return False if we have a reason to believe this didn't work
+ :rtype: bool
+ """
+ post_url = "{}providers/{}/register?api-version=2016-02-01".format(url_prefix, rp_name)
+ get_url = "{}providers/{}?api-version=2016-02-01".format(url_prefix, rp_name)
+ _LOGGER.warning(
+ "Resource provider '%s' used by this operation is not registered. We are registering for you.",
+ rp_name,
+ )
+ post_response = self.next.send(self._build_next_request(initial_request, "POST", post_url))
+ if post_response.http_response.status_code != 200:
+ _LOGGER.warning("Registration failed. Please register manually.")
+ return False
+
+ while True:
+ time.sleep(10)
+ get_response = self.next.send(self._build_next_request(initial_request, "GET", get_url))
+ rp_info = json.loads(get_response.http_response.text())
+ if rp_info["registrationState"] == "Registered":
+ _LOGGER.warning("Registration succeeded.")
+ return True
diff --git a/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py
new file mode 100644
index 00000000..8ea7d397
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/mgmt/core/policies/_base_async.py
@@ -0,0 +1,105 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import asyncio
+import json
+import logging
+import uuid
+from typing import Union
+
+from azure.core.pipeline import PipelineRequest, PipelineResponse
+from azure.core.pipeline.policies import AsyncHTTPPolicy
+from azure.core.pipeline.transport import (
+ HttpRequest as LegacyHttpRequest,
+ AsyncHttpResponse as LegacyAsyncHttpResponse,
+)
+from azure.core.rest import HttpRequest, AsyncHttpResponse
+
+
+from ._base import _SansIOARMAutoResourceProviderRegistrationPolicy
+
+_LOGGER = logging.getLogger(__name__)
+
+HTTPRequestType = Union[LegacyHttpRequest, HttpRequest]
+AsyncHTTPResponseType = Union[LegacyAsyncHttpResponse, AsyncHttpResponse]
+PipelineResponseType = PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]
+
+
+class AsyncARMAutoResourceProviderRegistrationPolicy(
+ _SansIOARMAutoResourceProviderRegistrationPolicy, AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]
+): # pylint: disable=name-too-long
+ """Auto register an ARM resource provider if not done yet."""
+
+ async def send( # pylint: disable=invalid-overridden-method
+ self, request: PipelineRequest[HTTPRequestType]
+ ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]:
+ http_request = request.http_request
+ response = await self.next.send(request)
+ if response.http_response.status_code == 409:
+ rp_name = self._check_rp_not_registered_err(response)
+ if rp_name:
+ url_prefix = self._extract_subscription_url(http_request.url)
+ register_rp_status = await self._async_register_rp(request, url_prefix, rp_name)
+ if not register_rp_status:
+ return response
+ # Change the 'x-ms-client-request-id' otherwise the Azure endpoint
+ # just returns the same 409 payload without looking at the actual query
+ if "x-ms-client-request-id" in http_request.headers:
+ http_request.headers["x-ms-client-request-id"] = str(uuid.uuid4())
+ response = await self.next.send(request)
+ return response
+
+ async def _async_register_rp(
+ self, initial_request: PipelineRequest[HTTPRequestType], url_prefix: str, rp_name: str
+ ) -> bool:
+ """Synchronously register the RP is paremeter.
+
+ Return False if we have a reason to believe this didn't work
+
+ :param initial_request: The initial request
+ :type initial_request: ~azure.core.pipeline.PipelineRequest
+ :param str url_prefix: The url prefix
+ :param str rp_name: The resource provider name
+ :return: Return False if we have a reason to believe this didn't work
+ :rtype: bool
+ """
+ post_url = "{}providers/{}/register?api-version=2016-02-01".format(url_prefix, rp_name)
+ get_url = "{}providers/{}?api-version=2016-02-01".format(url_prefix, rp_name)
+ _LOGGER.warning(
+ "Resource provider '%s' used by this operation is not registered. We are registering for you.",
+ rp_name,
+ )
+ post_response = await self.next.send(self._build_next_request(initial_request, "POST", post_url))
+ if post_response.http_response.status_code != 200:
+ _LOGGER.warning("Registration failed. Please register manually.")
+ return False
+
+ while True:
+ await asyncio.sleep(10)
+ get_response = await self.next.send(self._build_next_request(initial_request, "GET", get_url))
+ rp_info = json.loads(get_response.http_response.text())
+ if rp_info["registrationState"] == "Registered":
+ _LOGGER.warning("Registration succeeded.")
+ return True