diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py | 888 |
1 files changed, 888 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py b/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py new file mode 100644 index 00000000..91a12a84 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py @@ -0,0 +1,888 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import abc +import base64 +import json +from enum import Enum +from typing import ( + Optional, + Any, + Tuple, + Callable, + Dict, + Sequence, + Generic, + TypeVar, + cast, + Union, +) + +from ..exceptions import HttpResponseError, DecodeError +from . import PollingMethod +from ..pipeline.policies._utils import get_retry_after +from ..pipeline._tools import is_rest +from .._enum_meta import CaseInsensitiveEnumMeta +from .. import PipelineClient +from ..pipeline import PipelineResponse +from ..pipeline.transport import ( + HttpTransport, + HttpRequest as LegacyHttpRequest, + HttpResponse as LegacyHttpResponse, + AsyncHttpResponse as LegacyAsyncHttpResponse, +) +from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse + + +HttpRequestType = Union[LegacyHttpRequest, HttpRequest] +HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only +AllHttpResponseType = Union[ + LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse +] # Sync or async +LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse] +NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse] +PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType] +HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType) +HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only +AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async + +ABC = abc.ABC +PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) +PipelineClientType = TypeVar("PipelineClientType") +HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True) +HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True) + + +_FINISHED = frozenset(["succeeded", "canceled", "failed"]) +_FAILED = frozenset(["canceled", "failed"]) +_SUCCEEDED = frozenset(["succeeded"]) + + +def _get_content(response: AllHttpResponseType) -> bytes: + """Get the content of this response. This is designed specifically to avoid + a warning of mypy for body() access, as this method is deprecated. + + :param response: The response object. + :type response: any + :return: The content of this response. + :rtype: bytes + """ + if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)): + return response.body() + return response.content + + +def _finished(status): + if hasattr(status, "value"): + status = status.value + return str(status).lower() in _FINISHED + + +def _failed(status): + if hasattr(status, "value"): + status = status.value + return str(status).lower() in _FAILED + + +def _succeeded(status): + if hasattr(status, "value"): + status = status.value + return str(status).lower() in _SUCCEEDED + + +class BadStatus(Exception): + pass + + +class BadResponse(Exception): + pass + + +class OperationFailed(Exception): + pass + + +def _as_json(response: AllHttpResponseType) -> Dict[str, Any]: + """Assuming this is not empty, return the content as JSON. + + Result/exceptions is not determined if you call this method without testing _is_empty. + + :param response: The response object. + :type response: any + :return: The content of this response as dict. + :rtype: dict + :raises: DecodeError if response body contains invalid json data. + """ + try: + return json.loads(response.text()) + except ValueError as err: + raise DecodeError("Error occurred in deserializing the response body.") from err + + +def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None: + """Check response status code is valid. + + Must be 200, 201, 202, or 204. + + :param response: The response object. + :type response: any + :raises: BadStatus if invalid status. + """ + code = response.status_code + if code in {200, 201, 202, 204}: + return + raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method)) + + +def _is_empty(response: AllHttpResponseType) -> bool: + """Check if response body contains meaningful content. + + :param response: The response object. + :type response: any + :return: True if response body is empty, False otherwise. + :rtype: bool + """ + return not bool(_get_content(response)) + + +class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]): + """Protocol to implement for a long running operation algorithm.""" + + @abc.abstractmethod + def can_poll( + self, + pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], + ) -> bool: + """Answer if this polling method could be used. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: True if this polling method could be used, False otherwise. + :rtype: bool + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_polling_url(self) -> str: + """Return the polling URL. + + :return: The polling URL. + :rtype: str + """ + raise NotImplementedError() + + @abc.abstractmethod + def set_initial_status( + self, + pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], + ) -> str: + """Process first response after initiating long running operation. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The initial status. + :rtype: str + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_status( + self, + pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], + ) -> str: + """Return the status string extracted from this response. + + :param pipeline_response: The response object. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The status string. + :rtype: str + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_final_get_url( + self, + pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], + ) -> Optional[str]: + """If a final GET is needed, returns the URL. + + :param pipeline_response: Success REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The URL to the final GET, or None if no final GET is needed. + :rtype: str or None + """ + raise NotImplementedError() + + +class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): + """Known LRO options from Swagger.""" + + FINAL_STATE_VIA = "final-state-via" + + +class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): + """Possible final-state-via options.""" + + AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation" + LOCATION_FINAL_STATE = "location" + OPERATION_LOCATION_FINAL_STATE = "operation-location" + + +class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): + """Implements a operation resource polling, typically from Operation-Location. + + :param str operation_location_header: Name of the header to return operation format (default 'operation-location') + :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see + https://aka.ms/azsdk/autorest/openapi/lro-options + """ + + _async_url: str + """Url to resource monitor (AzureAsyncOperation or Operation-Location)""" + + _location_url: Optional[str] + """Location header if present""" + + _request: Any + """The initial request done""" + + def __init__( + self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None + ): + self._operation_location_header = operation_location_header + self._location_url = None + self._lro_options = lro_options or {} + + def can_poll( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> bool: + """Check if status monitor header (e.g. Operation-Location) is present. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: True if this polling method could be used, False otherwise. + :rtype: bool + """ + response = pipeline_response.http_response + return self._operation_location_header in response.headers + + def get_polling_url(self) -> str: + """Return the polling URL. + + Will extract it from the defined header to read (e.g. Operation-Location) + + :return: The polling URL. + :rtype: str + """ + return self._async_url + + def get_final_get_url( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> Optional[str]: + """If a final GET is needed, returns the URL. + + :param pipeline_response: Success REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The URL to the final GET, or None if no final GET is needed. + :rtype: str or None + """ + if ( + self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE + and self._location_url + ): + return self._location_url + if ( + self._lro_options.get(_LroOption.FINAL_STATE_VIA) + in [ + _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE, + _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE, + ] + and self._request.method == "POST" + ): + return None + response = pipeline_response.http_response + if not _is_empty(response): + body = _as_json(response) + # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location + resource_location = body.get("resourceLocation") + if resource_location: + return resource_location + + if self._request.method in {"PUT", "PATCH"}: + return self._request.url + + if self._request.method == "POST" and self._location_url: + return self._location_url + + return None + + def set_initial_status( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> str: + """Process first response after initiating long running operation. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The initial status. + :rtype: str + """ + self._request = pipeline_response.http_response.request + response = pipeline_response.http_response + + self._set_async_url_if_present(response) + + if response.status_code in {200, 201, 202, 204} and self._async_url: + # Check if we can extract status from initial response, if present + try: + return self.get_status(pipeline_response) + # Wide catch, it may not even be JSON at all, deserialization is lenient + except Exception: # pylint: disable=broad-except + pass + return "InProgress" + raise OperationFailed("Operation failed or canceled") + + def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None: + self._async_url = response.headers[self._operation_location_header] + + location_url = response.headers.get("location") + if location_url: + self._location_url = location_url + + def get_status( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> str: + """Process the latest status update retrieved from an "Operation-Location" header. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The status string. + :rtype: str + :raises: BadResponse if response has no body, or body does not contain status. + """ + response = pipeline_response.http_response + if _is_empty(response): + raise BadResponse("The response from long running operation does not contain a body.") + + body = _as_json(response) + status = body.get("status") + if not status: + raise BadResponse("No status found in body") + return status + + +class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): + """Implements a Location polling.""" + + _location_url: str + """Location header""" + + def can_poll( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> bool: + """True if contains a Location header + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: True if this polling method could be used, False otherwise. + :rtype: bool + """ + response = pipeline_response.http_response + return "location" in response.headers + + def get_polling_url(self) -> str: + """Return the Location header value. + + :return: The polling URL. + :rtype: str + """ + return self._location_url + + def get_final_get_url( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> Optional[str]: + """If a final GET is needed, returns the URL. + + Always return None for a Location polling. + + :param pipeline_response: Success REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: Always None for this implementation. + :rtype: None + """ + return None + + def set_initial_status( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> str: + """Process first response after initiating long running operation. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The initial status. + :rtype: str + """ + response = pipeline_response.http_response + + self._location_url = response.headers["location"] + + if response.status_code in {200, 201, 202, 204} and self._location_url: + return "InProgress" + raise OperationFailed("Operation failed or canceled") + + def get_status( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> str: + """Return the status string extracted from this response. + + For Location polling, it means the status monitor returns 202. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The status string. + :rtype: str + """ + response = pipeline_response.http_response + if "location" in response.headers: + self._location_url = response.headers["location"] + + return "InProgress" if response.status_code == 202 else "Succeeded" + + +class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): + """Should be the fallback polling, that don't poll but exit successfully + if not other polling are detected and status code is 2xx. + """ + + def can_poll( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> bool: + """Answer if this polling method could be used. + + For this implementation, always True. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: True if this polling method could be used, False otherwise. + :rtype: bool + """ + return True + + def get_polling_url(self) -> str: + """Return the polling URL. + + This is not implemented for this polling, since we're never supposed to loop. + + :return: The polling URL. + :rtype: str + """ + raise ValueError("This polling doesn't support polling url") + + def set_initial_status( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> str: + """Process first response after initiating long running operation. + + Will succeed immediately. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The initial status. + :rtype: str + """ + return "Succeeded" + + def get_status( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> str: + """Return the status string extracted from this response. + + Only possible status is success. + + :param pipeline_response: Initial REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The status string. + :rtype: str + """ + return "Succeeded" + + def get_final_get_url( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> Optional[str]: + """If a final GET is needed, returns the URL. + + :param pipeline_response: Success REST call response. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :rtype: str + :return: Always None for this implementation. + """ + return None + + +class _SansIOLROBasePolling( + Generic[ + PollingReturnType_co, + PipelineClientType, + HttpRequestTypeVar, + AllHttpResponseTypeVar, + ] +): # pylint: disable=too-many-instance-attributes + """A base class that has no opinion on IO, to help mypy be accurate. + + :param float timeout: Default polling internal in absence of Retry-After header, in seconds. + :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use. + :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring. + :type lro_options: dict[str, any] + :param path_format_arguments: A dictionary of the format arguments to be used to format the URL. + :type path_format_arguments: dict[str, str] + """ + + _deserialization_callback: Callable[[Any], PollingReturnType_co] + """The deserialization callback that returns the final instance.""" + + _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar] + """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide.""" + + _status: str + """Hold the current status of this poller""" + + _client: PipelineClientType + """The Azure Core Pipeline client used to make request.""" + + def __init__( + self, + timeout: float = 30, + lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None, + lro_options: Optional[Dict[str, Any]] = None, + path_format_arguments: Optional[Dict[str, str]] = None, + **operation_config: Any + ): + self._lro_algorithms = lro_algorithms or [ + OperationResourcePolling(lro_options=lro_options), + LocationPolling(), + StatusCheckPolling(), + ] + + self._timeout = timeout + self._operation_config = operation_config + self._lro_options = lro_options + self._path_format_arguments = path_format_arguments + + def initialize( + self, + client: PipelineClientType, + initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + deserialization_callback: Callable[ + [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]], + PollingReturnType_co, + ], + ) -> None: + """Set the initial status of this LRO. + + :param client: The Azure Core Pipeline client used to make request. + :type client: ~azure.core.pipeline.PipelineClient + :param initial_response: The initial response for the call. + :type initial_response: ~azure.core.pipeline.PipelineResponse + :param deserialization_callback: A callback function to deserialize the final response. + :type deserialization_callback: callable + :raises: HttpResponseError if initial status is incorrect LRO state + """ + self._client = client + self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init + self._initial_response # pylint: disable=attribute-defined-outside-init + ) = initial_response + self._deserialization_callback = deserialization_callback + + for operation in self._lro_algorithms: + if operation.can_poll(initial_response): + self._operation = operation + break + else: + raise BadResponse("Unable to find status link for polling.") + + try: + _raise_if_bad_http_status_and_method(self._initial_response.http_response) + self._status = self._operation.set_initial_status(initial_response) + + except BadStatus as err: + self._status = "Failed" + raise HttpResponseError(response=initial_response.http_response, error=err) from err + except BadResponse as err: + self._status = "Failed" + raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err + except OperationFailed as err: + raise HttpResponseError(response=initial_response.http_response, error=err) from err + + def get_continuation_token(self) -> str: + import pickle + + return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii") + + @classmethod + def from_continuation_token( + cls, continuation_token: str, **kwargs: Any + ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: + try: + client = kwargs["client"] + except KeyError: + raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None + + try: + deserialization_callback = kwargs["deserialization_callback"] + except KeyError: + raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None + + import pickle + + initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec + # Restore the transport in the context + initial_response.context.transport = client._pipeline._transport # pylint: disable=protected-access + return client, initial_response, deserialization_callback + + def status(self) -> str: + """Return the current status as a string. + + :rtype: str + :return: The current status. + """ + if not self._operation: + raise ValueError("set_initial_status was never called. Did you give this instance to a poller?") + return self._status + + def finished(self) -> bool: + """Is this polling finished? + + :rtype: bool + :return: True if finished, False otherwise. + """ + return _finished(self.status()) + + def resource(self) -> PollingReturnType_co: + """Return the built resource. + + :rtype: any + :return: The built resource. + """ + return self._parse_resource(self._pipeline_response) + + def _parse_resource( + self, + pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], + ) -> PollingReturnType_co: + """Assuming this response is a resource, use the deserialization callback to parse it. + If body is empty, assuming no resource to return. + + :param pipeline_response: The response object. + :type pipeline_response: ~azure.core.pipeline.PipelineResponse + :return: The parsed resource. + :rtype: any + """ + response = pipeline_response.http_response + if not _is_empty(response): + return self._deserialization_callback(pipeline_response) + + # This "type ignore" has been discussed with architects. + # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None), + # BUT the returned payload is actually empty, we don't want to fail, but return None. + # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong". + # This is reducing the quality and the value of the typing annotations + # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here. + return None # type: ignore + + def _get_request_id(self) -> str: + return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"] + + def _extract_delay(self) -> float: + delay = get_retry_after(self._pipeline_response) + if delay: + return delay + return self._timeout + + +class LROBasePolling( + _SansIOLROBasePolling[ + PollingReturnType_co, + PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar], + HttpRequestTypeVar, + HttpResponseTypeVar, + ], + PollingMethod[PollingReturnType_co], +): + """A base LRO poller. + + This assumes a basic flow: + - I analyze the response to decide the polling approach + - I poll + - I ask the final resource depending of the polling approach + + If your polling need are more specific, you could implement a PollingMethod directly + """ + + _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] + """Store the initial response.""" + + _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] + """Store the latest received HTTP response, initialized by the first answer.""" + + @property + def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]: + return self._client._pipeline._transport # pylint: disable=protected-access + + def __getattribute__(self, name: str) -> Any: + """Find the right method for the job. + + This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO + is changing when azure-core was refactored in 1.27.0. The MRO change was causing + AsyncARMPolling to look-up the wrong methods and find the non-async ones. + + :param str name: The name of the attribute to retrieve. + :rtype: Any + :return: The attribute value. + """ + cls = object.__getattribute__(self, "__class__") + if cls.__name__ == "AsyncARMPolling" and name in [ + "run", + "update_status", + "request_status", + "_sleep", + "_delay", + "_poll", + ]: + return getattr(super(LROBasePolling, self), name) + return super().__getattribute__(name) + + def run(self) -> None: + try: + self._poll() + + except BadStatus as err: + self._status = "Failed" + raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err + + except BadResponse as err: + self._status = "Failed" + raise HttpResponseError( + response=self._pipeline_response.http_response, + message=str(err), + error=err, + ) from err + + except OperationFailed as err: + raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err + + def _poll(self) -> None: + """Poll status of operation so long as operation is incomplete and + we have an endpoint to query. + + :raises: OperationFailed if operation status 'Failed' or 'Canceled'. + :raises: BadStatus if response status invalid. + :raises: BadResponse if response invalid. + """ + if not self.finished(): + self.update_status() + while not self.finished(): + self._delay() + self.update_status() + + if _failed(self.status()): + raise OperationFailed("Operation failed or canceled") + + final_get_url = self._operation.get_final_get_url(self._pipeline_response) + if final_get_url: + self._pipeline_response = self.request_status(final_get_url) + _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) + + def _sleep(self, delay: float) -> None: + self._transport.sleep(delay) + + def _delay(self) -> None: + """Check for a 'retry-after' header to set timeout, + otherwise use configured timeout. + """ + delay = self._extract_delay() + self._sleep(delay) + + def update_status(self) -> None: + """Update the current status of the LRO.""" + self._pipeline_response = self.request_status(self._operation.get_polling_url()) + _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) + self._status = self._operation.get_status(self._pipeline_response) + + def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]: + """Do a simple GET to this status link. + + This method re-inject 'x-ms-client-request-id'. + + :param str status_link: The URL to poll. + :rtype: azure.core.pipeline.PipelineResponse + :return: The response of the status request. + """ + if self._path_format_arguments: + status_link = self._client.format_url(status_link, **self._path_format_arguments) + # Re-inject 'x-ms-client-request-id' while polling + if "request_id" not in self._operation_config: + self._operation_config["request_id"] = self._get_request_id() + + if is_rest(self._initial_response.http_response): + rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link)) + # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not + # declared in the typing of "send_request" + return cast( + PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], + self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config), + ) + + # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport + # casting things here, as we don't want the typing system to know + # about the legacy APIs. + request = cast(HttpRequestTypeVar, self._client.get(status_link)) + return cast( + PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], + self._client._pipeline.run( # pylint: disable=protected-access + request, stream=False, **self._operation_config + ), + ) + + +__all__ = [ + "BadResponse", + "BadStatus", + "OperationFailed", + "LongRunningOperation", + "OperationResourcePolling", + "LocationPolling", + "StatusCheckPolling", + "LROBasePolling", +] |