aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py888
1 files changed, 888 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py b/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py
new file mode 100644
index 00000000..91a12a84
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py
@@ -0,0 +1,888 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import abc
+import base64
+import json
+from enum import Enum
+from typing import (
+ Optional,
+ Any,
+ Tuple,
+ Callable,
+ Dict,
+ Sequence,
+ Generic,
+ TypeVar,
+ cast,
+ Union,
+)
+
+from ..exceptions import HttpResponseError, DecodeError
+from . import PollingMethod
+from ..pipeline.policies._utils import get_retry_after
+from ..pipeline._tools import is_rest
+from .._enum_meta import CaseInsensitiveEnumMeta
+from .. import PipelineClient
+from ..pipeline import PipelineResponse
+from ..pipeline.transport import (
+ HttpTransport,
+ HttpRequest as LegacyHttpRequest,
+ HttpResponse as LegacyHttpResponse,
+ AsyncHttpResponse as LegacyAsyncHttpResponse,
+)
+from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse
+
+
+HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
+HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only
+AllHttpResponseType = Union[
+ LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
+] # Sync or async
+LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse]
+NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse]
+PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType]
+HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
+HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only
+AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async
+
+ABC = abc.ABC
+PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
+PipelineClientType = TypeVar("PipelineClientType")
+HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True)
+HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True)
+
+
+_FINISHED = frozenset(["succeeded", "canceled", "failed"])
+_FAILED = frozenset(["canceled", "failed"])
+_SUCCEEDED = frozenset(["succeeded"])
+
+
+def _get_content(response: AllHttpResponseType) -> bytes:
+ """Get the content of this response. This is designed specifically to avoid
+ a warning of mypy for body() access, as this method is deprecated.
+
+ :param response: The response object.
+ :type response: any
+ :return: The content of this response.
+ :rtype: bytes
+ """
+ if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)):
+ return response.body()
+ return response.content
+
+
+def _finished(status):
+ if hasattr(status, "value"):
+ status = status.value
+ return str(status).lower() in _FINISHED
+
+
+def _failed(status):
+ if hasattr(status, "value"):
+ status = status.value
+ return str(status).lower() in _FAILED
+
+
+def _succeeded(status):
+ if hasattr(status, "value"):
+ status = status.value
+ return str(status).lower() in _SUCCEEDED
+
+
+class BadStatus(Exception):
+ pass
+
+
+class BadResponse(Exception):
+ pass
+
+
+class OperationFailed(Exception):
+ pass
+
+
+def _as_json(response: AllHttpResponseType) -> Dict[str, Any]:
+ """Assuming this is not empty, return the content as JSON.
+
+ Result/exceptions is not determined if you call this method without testing _is_empty.
+
+ :param response: The response object.
+ :type response: any
+ :return: The content of this response as dict.
+ :rtype: dict
+ :raises: DecodeError if response body contains invalid json data.
+ """
+ try:
+ return json.loads(response.text())
+ except ValueError as err:
+ raise DecodeError("Error occurred in deserializing the response body.") from err
+
+
+def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None:
+ """Check response status code is valid.
+
+ Must be 200, 201, 202, or 204.
+
+ :param response: The response object.
+ :type response: any
+ :raises: BadStatus if invalid status.
+ """
+ code = response.status_code
+ if code in {200, 201, 202, 204}:
+ return
+ raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method))
+
+
+def _is_empty(response: AllHttpResponseType) -> bool:
+ """Check if response body contains meaningful content.
+
+ :param response: The response object.
+ :type response: any
+ :return: True if response body is empty, False otherwise.
+ :rtype: bool
+ """
+ return not bool(_get_content(response))
+
+
+class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]):
+ """Protocol to implement for a long running operation algorithm."""
+
+ @abc.abstractmethod
+ def can_poll(
+ self,
+ pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+ ) -> bool:
+ """Answer if this polling method could be used.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: True if this polling method could be used, False otherwise.
+ :rtype: bool
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_polling_url(self) -> str:
+ """Return the polling URL.
+
+ :return: The polling URL.
+ :rtype: str
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def set_initial_status(
+ self,
+ pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+ ) -> str:
+ """Process first response after initiating long running operation.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The initial status.
+ :rtype: str
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_status(
+ self,
+ pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+ ) -> str:
+ """Return the status string extracted from this response.
+
+ :param pipeline_response: The response object.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The status string.
+ :rtype: str
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_final_get_url(
+ self,
+ pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+ ) -> Optional[str]:
+ """If a final GET is needed, returns the URL.
+
+ :param pipeline_response: Success REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The URL to the final GET, or None if no final GET is needed.
+ :rtype: str or None
+ """
+ raise NotImplementedError()
+
+
+class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
+ """Known LRO options from Swagger."""
+
+ FINAL_STATE_VIA = "final-state-via"
+
+
+class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
+ """Possible final-state-via options."""
+
+ AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
+ LOCATION_FINAL_STATE = "location"
+ OPERATION_LOCATION_FINAL_STATE = "operation-location"
+
+
+class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
+ """Implements a operation resource polling, typically from Operation-Location.
+
+ :param str operation_location_header: Name of the header to return operation format (default 'operation-location')
+ :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see
+ https://aka.ms/azsdk/autorest/openapi/lro-options
+ """
+
+ _async_url: str
+ """Url to resource monitor (AzureAsyncOperation or Operation-Location)"""
+
+ _location_url: Optional[str]
+ """Location header if present"""
+
+ _request: Any
+ """The initial request done"""
+
+ def __init__(
+ self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None
+ ):
+ self._operation_location_header = operation_location_header
+ self._location_url = None
+ self._lro_options = lro_options or {}
+
+ def can_poll(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> bool:
+ """Check if status monitor header (e.g. Operation-Location) is present.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: True if this polling method could be used, False otherwise.
+ :rtype: bool
+ """
+ response = pipeline_response.http_response
+ return self._operation_location_header in response.headers
+
+ def get_polling_url(self) -> str:
+ """Return the polling URL.
+
+ Will extract it from the defined header to read (e.g. Operation-Location)
+
+ :return: The polling URL.
+ :rtype: str
+ """
+ return self._async_url
+
+ def get_final_get_url(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> Optional[str]:
+ """If a final GET is needed, returns the URL.
+
+ :param pipeline_response: Success REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The URL to the final GET, or None if no final GET is needed.
+ :rtype: str or None
+ """
+ if (
+ self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE
+ and self._location_url
+ ):
+ return self._location_url
+ if (
+ self._lro_options.get(_LroOption.FINAL_STATE_VIA)
+ in [
+ _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE,
+ _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE,
+ ]
+ and self._request.method == "POST"
+ ):
+ return None
+ response = pipeline_response.http_response
+ if not _is_empty(response):
+ body = _as_json(response)
+ # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location
+ resource_location = body.get("resourceLocation")
+ if resource_location:
+ return resource_location
+
+ if self._request.method in {"PUT", "PATCH"}:
+ return self._request.url
+
+ if self._request.method == "POST" and self._location_url:
+ return self._location_url
+
+ return None
+
+ def set_initial_status(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> str:
+ """Process first response after initiating long running operation.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The initial status.
+ :rtype: str
+ """
+ self._request = pipeline_response.http_response.request
+ response = pipeline_response.http_response
+
+ self._set_async_url_if_present(response)
+
+ if response.status_code in {200, 201, 202, 204} and self._async_url:
+ # Check if we can extract status from initial response, if present
+ try:
+ return self.get_status(pipeline_response)
+ # Wide catch, it may not even be JSON at all, deserialization is lenient
+ except Exception: # pylint: disable=broad-except
+ pass
+ return "InProgress"
+ raise OperationFailed("Operation failed or canceled")
+
+ def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None:
+ self._async_url = response.headers[self._operation_location_header]
+
+ location_url = response.headers.get("location")
+ if location_url:
+ self._location_url = location_url
+
+ def get_status(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> str:
+ """Process the latest status update retrieved from an "Operation-Location" header.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The status string.
+ :rtype: str
+ :raises: BadResponse if response has no body, or body does not contain status.
+ """
+ response = pipeline_response.http_response
+ if _is_empty(response):
+ raise BadResponse("The response from long running operation does not contain a body.")
+
+ body = _as_json(response)
+ status = body.get("status")
+ if not status:
+ raise BadResponse("No status found in body")
+ return status
+
+
+class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
+ """Implements a Location polling."""
+
+ _location_url: str
+ """Location header"""
+
+ def can_poll(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> bool:
+ """True if contains a Location header
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: True if this polling method could be used, False otherwise.
+ :rtype: bool
+ """
+ response = pipeline_response.http_response
+ return "location" in response.headers
+
+ def get_polling_url(self) -> str:
+ """Return the Location header value.
+
+ :return: The polling URL.
+ :rtype: str
+ """
+ return self._location_url
+
+ def get_final_get_url(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> Optional[str]:
+ """If a final GET is needed, returns the URL.
+
+ Always return None for a Location polling.
+
+ :param pipeline_response: Success REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: Always None for this implementation.
+ :rtype: None
+ """
+ return None
+
+ def set_initial_status(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> str:
+ """Process first response after initiating long running operation.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The initial status.
+ :rtype: str
+ """
+ response = pipeline_response.http_response
+
+ self._location_url = response.headers["location"]
+
+ if response.status_code in {200, 201, 202, 204} and self._location_url:
+ return "InProgress"
+ raise OperationFailed("Operation failed or canceled")
+
+ def get_status(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> str:
+ """Return the status string extracted from this response.
+
+ For Location polling, it means the status monitor returns 202.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The status string.
+ :rtype: str
+ """
+ response = pipeline_response.http_response
+ if "location" in response.headers:
+ self._location_url = response.headers["location"]
+
+ return "InProgress" if response.status_code == 202 else "Succeeded"
+
+
+class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
+ """Should be the fallback polling, that don't poll but exit successfully
+ if not other polling are detected and status code is 2xx.
+ """
+
+ def can_poll(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> bool:
+ """Answer if this polling method could be used.
+
+ For this implementation, always True.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: True if this polling method could be used, False otherwise.
+ :rtype: bool
+ """
+ return True
+
+ def get_polling_url(self) -> str:
+ """Return the polling URL.
+
+ This is not implemented for this polling, since we're never supposed to loop.
+
+ :return: The polling URL.
+ :rtype: str
+ """
+ raise ValueError("This polling doesn't support polling url")
+
+ def set_initial_status(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> str:
+ """Process first response after initiating long running operation.
+
+ Will succeed immediately.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The initial status.
+ :rtype: str
+ """
+ return "Succeeded"
+
+ def get_status(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> str:
+ """Return the status string extracted from this response.
+
+ Only possible status is success.
+
+ :param pipeline_response: Initial REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The status string.
+ :rtype: str
+ """
+ return "Succeeded"
+
+ def get_final_get_url(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> Optional[str]:
+ """If a final GET is needed, returns the URL.
+
+ :param pipeline_response: Success REST call response.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :rtype: str
+ :return: Always None for this implementation.
+ """
+ return None
+
+
+class _SansIOLROBasePolling(
+ Generic[
+ PollingReturnType_co,
+ PipelineClientType,
+ HttpRequestTypeVar,
+ AllHttpResponseTypeVar,
+ ]
+): # pylint: disable=too-many-instance-attributes
+ """A base class that has no opinion on IO, to help mypy be accurate.
+
+ :param float timeout: Default polling internal in absence of Retry-After header, in seconds.
+ :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use.
+ :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring.
+ :type lro_options: dict[str, any]
+ :param path_format_arguments: A dictionary of the format arguments to be used to format the URL.
+ :type path_format_arguments: dict[str, str]
+ """
+
+ _deserialization_callback: Callable[[Any], PollingReturnType_co]
+ """The deserialization callback that returns the final instance."""
+
+ _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]
+ """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide."""
+
+ _status: str
+ """Hold the current status of this poller"""
+
+ _client: PipelineClientType
+ """The Azure Core Pipeline client used to make request."""
+
+ def __init__(
+ self,
+ timeout: float = 30,
+ lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None,
+ lro_options: Optional[Dict[str, Any]] = None,
+ path_format_arguments: Optional[Dict[str, str]] = None,
+ **operation_config: Any
+ ):
+ self._lro_algorithms = lro_algorithms or [
+ OperationResourcePolling(lro_options=lro_options),
+ LocationPolling(),
+ StatusCheckPolling(),
+ ]
+
+ self._timeout = timeout
+ self._operation_config = operation_config
+ self._lro_options = lro_options
+ self._path_format_arguments = path_format_arguments
+
+ def initialize(
+ self,
+ client: PipelineClientType,
+ initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ deserialization_callback: Callable[
+ [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]],
+ PollingReturnType_co,
+ ],
+ ) -> None:
+ """Set the initial status of this LRO.
+
+ :param client: The Azure Core Pipeline client used to make request.
+ :type client: ~azure.core.pipeline.PipelineClient
+ :param initial_response: The initial response for the call.
+ :type initial_response: ~azure.core.pipeline.PipelineResponse
+ :param deserialization_callback: A callback function to deserialize the final response.
+ :type deserialization_callback: callable
+ :raises: HttpResponseError if initial status is incorrect LRO state
+ """
+ self._client = client
+ self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init
+ self._initial_response # pylint: disable=attribute-defined-outside-init
+ ) = initial_response
+ self._deserialization_callback = deserialization_callback
+
+ for operation in self._lro_algorithms:
+ if operation.can_poll(initial_response):
+ self._operation = operation
+ break
+ else:
+ raise BadResponse("Unable to find status link for polling.")
+
+ try:
+ _raise_if_bad_http_status_and_method(self._initial_response.http_response)
+ self._status = self._operation.set_initial_status(initial_response)
+
+ except BadStatus as err:
+ self._status = "Failed"
+ raise HttpResponseError(response=initial_response.http_response, error=err) from err
+ except BadResponse as err:
+ self._status = "Failed"
+ raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err
+ except OperationFailed as err:
+ raise HttpResponseError(response=initial_response.http_response, error=err) from err
+
+ def get_continuation_token(self) -> str:
+ import pickle
+
+ return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii")
+
+ @classmethod
+ def from_continuation_token(
+ cls, continuation_token: str, **kwargs: Any
+ ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]:
+ try:
+ client = kwargs["client"]
+ except KeyError:
+ raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None
+
+ try:
+ deserialization_callback = kwargs["deserialization_callback"]
+ except KeyError:
+ raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None
+
+ import pickle
+
+ initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec
+ # Restore the transport in the context
+ initial_response.context.transport = client._pipeline._transport # pylint: disable=protected-access
+ return client, initial_response, deserialization_callback
+
+ def status(self) -> str:
+ """Return the current status as a string.
+
+ :rtype: str
+ :return: The current status.
+ """
+ if not self._operation:
+ raise ValueError("set_initial_status was never called. Did you give this instance to a poller?")
+ return self._status
+
+ def finished(self) -> bool:
+ """Is this polling finished?
+
+ :rtype: bool
+ :return: True if finished, False otherwise.
+ """
+ return _finished(self.status())
+
+ def resource(self) -> PollingReturnType_co:
+ """Return the built resource.
+
+ :rtype: any
+ :return: The built resource.
+ """
+ return self._parse_resource(self._pipeline_response)
+
+ def _parse_resource(
+ self,
+ pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+ ) -> PollingReturnType_co:
+ """Assuming this response is a resource, use the deserialization callback to parse it.
+ If body is empty, assuming no resource to return.
+
+ :param pipeline_response: The response object.
+ :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+ :return: The parsed resource.
+ :rtype: any
+ """
+ response = pipeline_response.http_response
+ if not _is_empty(response):
+ return self._deserialization_callback(pipeline_response)
+
+ # This "type ignore" has been discussed with architects.
+ # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None),
+ # BUT the returned payload is actually empty, we don't want to fail, but return None.
+ # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong".
+ # This is reducing the quality and the value of the typing annotations
+ # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here.
+ return None # type: ignore
+
+ def _get_request_id(self) -> str:
+ return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"]
+
+ def _extract_delay(self) -> float:
+ delay = get_retry_after(self._pipeline_response)
+ if delay:
+ return delay
+ return self._timeout
+
+
+class LROBasePolling(
+ _SansIOLROBasePolling[
+ PollingReturnType_co,
+ PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar],
+ HttpRequestTypeVar,
+ HttpResponseTypeVar,
+ ],
+ PollingMethod[PollingReturnType_co],
+):
+ """A base LRO poller.
+
+ This assumes a basic flow:
+ - I analyze the response to decide the polling approach
+ - I poll
+ - I ask the final resource depending of the polling approach
+
+ If your polling need are more specific, you could implement a PollingMethod directly
+ """
+
+ _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
+ """Store the initial response."""
+
+ _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
+ """Store the latest received HTTP response, initialized by the first answer."""
+
+ @property
+ def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]:
+ return self._client._pipeline._transport # pylint: disable=protected-access
+
+ def __getattribute__(self, name: str) -> Any:
+ """Find the right method for the job.
+
+ This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO
+ is changing when azure-core was refactored in 1.27.0. The MRO change was causing
+ AsyncARMPolling to look-up the wrong methods and find the non-async ones.
+
+ :param str name: The name of the attribute to retrieve.
+ :rtype: Any
+ :return: The attribute value.
+ """
+ cls = object.__getattribute__(self, "__class__")
+ if cls.__name__ == "AsyncARMPolling" and name in [
+ "run",
+ "update_status",
+ "request_status",
+ "_sleep",
+ "_delay",
+ "_poll",
+ ]:
+ return getattr(super(LROBasePolling, self), name)
+ return super().__getattribute__(name)
+
+ def run(self) -> None:
+ try:
+ self._poll()
+
+ except BadStatus as err:
+ self._status = "Failed"
+ raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
+
+ except BadResponse as err:
+ self._status = "Failed"
+ raise HttpResponseError(
+ response=self._pipeline_response.http_response,
+ message=str(err),
+ error=err,
+ ) from err
+
+ except OperationFailed as err:
+ raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
+
+ def _poll(self) -> None:
+ """Poll status of operation so long as operation is incomplete and
+ we have an endpoint to query.
+
+ :raises: OperationFailed if operation status 'Failed' or 'Canceled'.
+ :raises: BadStatus if response status invalid.
+ :raises: BadResponse if response invalid.
+ """
+ if not self.finished():
+ self.update_status()
+ while not self.finished():
+ self._delay()
+ self.update_status()
+
+ if _failed(self.status()):
+ raise OperationFailed("Operation failed or canceled")
+
+ final_get_url = self._operation.get_final_get_url(self._pipeline_response)
+ if final_get_url:
+ self._pipeline_response = self.request_status(final_get_url)
+ _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
+
+ def _sleep(self, delay: float) -> None:
+ self._transport.sleep(delay)
+
+ def _delay(self) -> None:
+ """Check for a 'retry-after' header to set timeout,
+ otherwise use configured timeout.
+ """
+ delay = self._extract_delay()
+ self._sleep(delay)
+
+ def update_status(self) -> None:
+ """Update the current status of the LRO."""
+ self._pipeline_response = self.request_status(self._operation.get_polling_url())
+ _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
+ self._status = self._operation.get_status(self._pipeline_response)
+
+ def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]:
+ """Do a simple GET to this status link.
+
+ This method re-inject 'x-ms-client-request-id'.
+
+ :param str status_link: The URL to poll.
+ :rtype: azure.core.pipeline.PipelineResponse
+ :return: The response of the status request.
+ """
+ if self._path_format_arguments:
+ status_link = self._client.format_url(status_link, **self._path_format_arguments)
+ # Re-inject 'x-ms-client-request-id' while polling
+ if "request_id" not in self._operation_config:
+ self._operation_config["request_id"] = self._get_request_id()
+
+ if is_rest(self._initial_response.http_response):
+ rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link))
+ # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not
+ # declared in the typing of "send_request"
+ return cast(
+ PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
+ self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config),
+ )
+
+ # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport
+ # casting things here, as we don't want the typing system to know
+ # about the legacy APIs.
+ request = cast(HttpRequestTypeVar, self._client.get(status_link))
+ return cast(
+ PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
+ self._client._pipeline.run( # pylint: disable=protected-access
+ request, stream=False, **self._operation_config
+ ),
+ )
+
+
+__all__ = [
+ "BadResponse",
+ "BadStatus",
+ "OperationFailed",
+ "LongRunningOperation",
+ "OperationResourcePolling",
+ "LocationPolling",
+ "StatusCheckPolling",
+ "LROBasePolling",
+]