about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/core/polling
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/polling')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/polling/__init__.py42
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/polling/_async_poller.py211
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/polling/_poller.py306
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/polling/async_base_polling.py182
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py888
5 files changed, 1629 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/__init__.py b/.venv/lib/python3.12/site-packages/azure/core/polling/__init__.py
new file mode 100644
index 00000000..193e76fa
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/polling/__init__.py
@@ -0,0 +1,42 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from ._poller import LROPoller, NoPolling, PollingMethod
+from ._async_poller import (
+    AsyncNoPolling,
+    AsyncPollingMethod,
+    async_poller,
+    AsyncLROPoller,
+)
+
+__all__ = [
+    "LROPoller",
+    "NoPolling",
+    "PollingMethod",
+    "AsyncNoPolling",
+    "AsyncPollingMethod",
+    "async_poller",
+    "AsyncLROPoller",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/_async_poller.py b/.venv/lib/python3.12/site-packages/azure/core/polling/_async_poller.py
new file mode 100644
index 00000000..611e8909
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/polling/_async_poller.py
@@ -0,0 +1,211 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import logging
+from typing import Callable, Any, Tuple, Generic, TypeVar, Generator, Awaitable
+
+from ..exceptions import AzureError
+from ._poller import _SansIONoPolling
+
+
+PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
+DeserializationCallbackType = Any
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class AsyncPollingMethod(Generic[PollingReturnType_co]):
+    """ABC class for polling method."""
+
+    def initialize(
+        self,
+        client: Any,
+        initial_response: Any,
+        deserialization_callback: DeserializationCallbackType,
+    ) -> None:
+        raise NotImplementedError("This method needs to be implemented")
+
+    async def run(self) -> None:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def status(self) -> str:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def finished(self) -> bool:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def resource(self) -> PollingReturnType_co:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def get_continuation_token(self) -> str:
+        raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__))
+
+    @classmethod
+    def from_continuation_token(
+        cls, continuation_token: str, **kwargs: Any
+    ) -> Tuple[Any, Any, DeserializationCallbackType]:
+        raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__))
+
+
+class AsyncNoPolling(_SansIONoPolling[PollingReturnType_co], AsyncPollingMethod[PollingReturnType_co]):
+    """An empty async poller that returns the deserialized initial response."""
+
+    async def run(self) -> None:
+        """Empty run, no polling.
+        Just override initial run to add "async"
+        """
+
+
+async def async_poller(
+    client: Any,
+    initial_response: Any,
+    deserialization_callback: Callable[[Any], PollingReturnType_co],
+    polling_method: AsyncPollingMethod[PollingReturnType_co],
+) -> PollingReturnType_co:
+    """Async Poller for long running operations.
+
+    .. deprecated:: 1.5.0
+       Use :class:`AsyncLROPoller` instead.
+
+    :param client: A pipeline service client.
+    :type client: ~azure.core.PipelineClient
+    :param initial_response: The initial call response
+    :type initial_response: ~azure.core.pipeline.PipelineResponse
+    :param deserialization_callback: A callback that takes a Response and return a deserialized object.
+                                     If a subclass of Model is given, this passes "deserialize" as callback.
+    :type deserialization_callback: callable or msrest.serialization.Model
+    :param polling_method: The polling strategy to adopt
+    :type polling_method: ~azure.core.polling.PollingMethod
+    :return: The final resource at the end of the polling.
+    :rtype: any or None
+    """
+    poller = AsyncLROPoller(client, initial_response, deserialization_callback, polling_method)
+    return await poller
+
+
+class AsyncLROPoller(Generic[PollingReturnType_co], Awaitable[PollingReturnType_co]):
+    """Async poller for long running operations.
+
+    :param client: A pipeline service client
+    :type client: ~azure.core.PipelineClient
+    :param initial_response: The initial call response
+    :type initial_response: ~azure.core.pipeline.PipelineResponse
+    :param deserialization_callback: A callback that takes a Response and return a deserialized object.
+                                     If a subclass of Model is given, this passes "deserialize" as callback.
+    :type deserialization_callback: callable or msrest.serialization.Model
+    :param polling_method: The polling strategy to adopt
+    :type polling_method: ~azure.core.polling.AsyncPollingMethod
+    """
+
+    def __init__(
+        self,
+        client: Any,
+        initial_response: Any,
+        deserialization_callback: Callable[[Any], PollingReturnType_co],
+        polling_method: AsyncPollingMethod[PollingReturnType_co],
+    ):
+        self._polling_method = polling_method
+        self._done = False
+
+        # This implicit test avoids bringing in an explicit dependency on Model directly
+        try:
+            deserialization_callback = deserialization_callback.deserialize  # type: ignore
+        except AttributeError:
+            pass
+
+        self._polling_method.initialize(client, initial_response, deserialization_callback)
+
+    def polling_method(self) -> AsyncPollingMethod[PollingReturnType_co]:
+        """Return the polling method associated to this poller.
+
+        :return: The polling method associated to this poller.
+        :rtype: ~azure.core.polling.AsyncPollingMethod
+        """
+        return self._polling_method
+
+    def continuation_token(self) -> str:
+        """Return a continuation token that allows to restart the poller later.
+
+        :returns: An opaque continuation token
+        :rtype: str
+        """
+        return self._polling_method.get_continuation_token()
+
+    @classmethod
+    def from_continuation_token(
+        cls, polling_method: AsyncPollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any
+    ) -> "AsyncLROPoller[PollingReturnType_co]":
+        (
+            client,
+            initial_response,
+            deserialization_callback,
+        ) = polling_method.from_continuation_token(continuation_token, **kwargs)
+        return cls(client, initial_response, deserialization_callback, polling_method)
+
+    def status(self) -> str:
+        """Returns the current status string.
+
+        :returns: The current status string
+        :rtype: str
+        """
+        return self._polling_method.status()
+
+    async def result(self) -> PollingReturnType_co:
+        """Return the result of the long running operation.
+
+        :returns: The deserialized resource of the long running operation, if one is available.
+        :rtype: any or None
+        :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query.
+        """
+        await self.wait()
+        return self._polling_method.resource()
+
+    def __await__(self) -> Generator[Any, None, PollingReturnType_co]:
+        return self.result().__await__()
+
+    async def wait(self) -> None:
+        """Wait on the long running operation.
+
+        :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query.
+        """
+        try:
+            await self._polling_method.run()
+        except AzureError as error:
+            if not error.continuation_token:
+                try:
+                    error.continuation_token = self.continuation_token()
+                except Exception as err:  # pylint: disable=broad-except
+                    _LOGGER.warning("Unable to retrieve continuation token: %s", err)
+                    error.continuation_token = None
+            raise
+        self._done = True
+
+    def done(self) -> bool:
+        """Check status of the long running operation.
+
+        :returns: 'True' if the process has completed, else 'False'.
+        :rtype: bool
+        """
+        return self._done
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/_poller.py b/.venv/lib/python3.12/site-packages/azure/core/polling/_poller.py
new file mode 100644
index 00000000..8b8e651e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/polling/_poller.py
@@ -0,0 +1,306 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import base64
+import logging
+import threading
+import uuid
+from typing import TypeVar, Generic, Any, Callable, Optional, Tuple, List
+from azure.core.exceptions import AzureError
+from azure.core.tracing.decorator import distributed_trace
+from azure.core.tracing.common import with_current_context
+
+
+PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
+DeserializationCallbackType = Any
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class PollingMethod(Generic[PollingReturnType_co]):
+    """ABC class for polling method."""
+
+    def initialize(
+        self,
+        client: Any,
+        initial_response: Any,
+        deserialization_callback: DeserializationCallbackType,
+    ) -> None:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def run(self) -> None:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def status(self) -> str:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def finished(self) -> bool:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def resource(self) -> PollingReturnType_co:
+        raise NotImplementedError("This method needs to be implemented")
+
+    def get_continuation_token(self) -> str:
+        raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__))
+
+    @classmethod
+    def from_continuation_token(
+        cls, continuation_token: str, **kwargs: Any
+    ) -> Tuple[Any, Any, DeserializationCallbackType]:
+        raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__))
+
+
+class _SansIONoPolling(Generic[PollingReturnType_co]):
+    _deserialization_callback: Callable[[Any], PollingReturnType_co]
+    """Deserialization callback passed during initialization"""
+
+    def __init__(self):
+        self._initial_response = None
+
+    def initialize(
+        self,
+        _: Any,
+        initial_response: Any,
+        deserialization_callback: Callable[[Any], PollingReturnType_co],
+    ) -> None:
+        self._initial_response = initial_response
+        self._deserialization_callback = deserialization_callback
+
+    def status(self) -> str:
+        """Return the current status.
+
+        :rtype: str
+        :return: The current status
+        """
+        return "succeeded"
+
+    def finished(self) -> bool:
+        """Is this polling finished?
+
+        :rtype: bool
+        :return: Whether this polling is finished
+        """
+        return True
+
+    def resource(self) -> PollingReturnType_co:
+        return self._deserialization_callback(self._initial_response)
+
+    def get_continuation_token(self) -> str:
+        import pickle
+
+        return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii")
+
+    @classmethod
+    def from_continuation_token(
+        cls, continuation_token: str, **kwargs: Any
+    ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]:
+        try:
+            deserialization_callback = kwargs["deserialization_callback"]
+        except KeyError:
+            raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None
+        import pickle
+
+        initial_response = pickle.loads(base64.b64decode(continuation_token))  # nosec
+        return None, initial_response, deserialization_callback
+
+
+class NoPolling(_SansIONoPolling[PollingReturnType_co], PollingMethod[PollingReturnType_co]):
+    """An empty poller that returns the deserialized initial response."""
+
+    def run(self) -> None:
+        """Empty run, no polling."""
+
+
+class LROPoller(Generic[PollingReturnType_co]):
+    """Poller for long running operations.
+
+    :param client: A pipeline service client
+    :type client: ~azure.core.PipelineClient
+    :param initial_response: The initial call response
+    :type initial_response: ~azure.core.pipeline.PipelineResponse
+    :param deserialization_callback: A callback that takes a Response and return a deserialized object.
+                                     If a subclass of Model is given, this passes "deserialize" as callback.
+    :type deserialization_callback: callable or msrest.serialization.Model
+    :param polling_method: The polling strategy to adopt
+    :type polling_method: ~azure.core.polling.PollingMethod
+    """
+
+    def __init__(
+        self,
+        client: Any,
+        initial_response: Any,
+        deserialization_callback: Callable[[Any], PollingReturnType_co],
+        polling_method: PollingMethod[PollingReturnType_co],
+    ) -> None:
+        self._callbacks: List[Callable] = []
+        self._polling_method = polling_method
+
+        # This implicit test avoids bringing in an explicit dependency on Model directly
+        try:
+            deserialization_callback = deserialization_callback.deserialize  # type: ignore
+        except AttributeError:
+            pass
+
+        # Might raise a CloudError
+        self._polling_method.initialize(client, initial_response, deserialization_callback)
+
+        # Prepare thread execution
+        self._thread = None
+        self._done = threading.Event()
+        self._exception = None
+        if self._polling_method.finished():
+            self._done.set()
+        else:
+            self._thread = threading.Thread(
+                target=with_current_context(self._start),
+                name="LROPoller({})".format(uuid.uuid4()),
+            )
+            self._thread.daemon = True
+            self._thread.start()
+
+    def _start(self):
+        """Start the long running operation.
+        On completion, runs any callbacks.
+        """
+        try:
+            self._polling_method.run()
+        except AzureError as error:
+            if not error.continuation_token:
+                try:
+                    error.continuation_token = self.continuation_token()
+                except Exception as err:  # pylint: disable=broad-except
+                    _LOGGER.warning("Unable to retrieve continuation token: %s", err)
+                    error.continuation_token = None
+
+            self._exception = error
+        except Exception as error:  # pylint: disable=broad-except
+            self._exception = error
+
+        finally:
+            self._done.set()
+
+        callbacks, self._callbacks = self._callbacks, []
+        while callbacks:
+            for call in callbacks:
+                call(self._polling_method)
+            callbacks, self._callbacks = self._callbacks, []
+
+    def polling_method(self) -> PollingMethod[PollingReturnType_co]:
+        """Return the polling method associated to this poller.
+
+        :return: The polling method
+        :rtype: ~azure.core.polling.PollingMethod
+        """
+        return self._polling_method
+
+    def continuation_token(self) -> str:
+        """Return a continuation token that allows to restart the poller later.
+
+        :returns: An opaque continuation token
+        :rtype: str
+        """
+        return self._polling_method.get_continuation_token()
+
+    @classmethod
+    def from_continuation_token(
+        cls, polling_method: PollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any
+    ) -> "LROPoller[PollingReturnType_co]":
+        (
+            client,
+            initial_response,
+            deserialization_callback,
+        ) = polling_method.from_continuation_token(continuation_token, **kwargs)
+        return cls(client, initial_response, deserialization_callback, polling_method)
+
+    def status(self) -> str:
+        """Returns the current status string.
+
+        :returns: The current status string
+        :rtype: str
+        """
+        return self._polling_method.status()
+
+    def result(self, timeout: Optional[float] = None) -> PollingReturnType_co:
+        """Return the result of the long running operation, or
+        the result available after the specified timeout.
+
+        :param float timeout: Period of time to wait before getting back control.
+        :returns: The deserialized resource of the long running operation, if one is available.
+        :rtype: any or None
+        :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query.
+        """
+        self.wait(timeout)
+        return self._polling_method.resource()
+
+    @distributed_trace
+    def wait(self, timeout: Optional[float] = None) -> None:
+        """Wait on the long running operation for a specified length
+        of time. You can check if this call as ended with timeout with the
+        "done()" method.
+
+        :param float timeout: Period of time to wait for the long running
+         operation to complete (in seconds).
+        :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query.
+        """
+        if self._thread is None:
+            return
+        self._thread.join(timeout=timeout)
+        try:
+            # Let's handle possible None in forgiveness here
+            # https://github.com/python/mypy/issues/8165
+            raise self._exception  # type: ignore
+        except TypeError:  # Was None
+            pass
+
+    def done(self) -> bool:
+        """Check status of the long running operation.
+
+        :returns: 'True' if the process has completed, else 'False'.
+        :rtype: bool
+        """
+        return self._thread is None or not self._thread.is_alive()
+
+    def add_done_callback(self, func: Callable) -> None:
+        """Add callback function to be run once the long running operation
+        has completed - regardless of the status of the operation.
+
+        :param callable func: Callback function that takes at least one
+         argument, a completed LongRunningOperation.
+        """
+        # Still use "_done" and not "done", since CBs are executed inside the thread.
+        if self._done.is_set():
+            func(self._polling_method)
+        # Let's add them still, for consistency (if you wish to access to it for some reasons)
+        self._callbacks.append(func)
+
+    def remove_done_callback(self, func: Callable) -> None:
+        """Remove a callback from the long running operation.
+
+        :param callable func: The function to be removed from the callbacks.
+        :raises ValueError: if the long running operation has already completed.
+        """
+        if self._done is None or self._done.is_set():
+            raise ValueError("Process is complete.")
+        self._callbacks = [c for c in self._callbacks if c != func]
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/async_base_polling.py b/.venv/lib/python3.12/site-packages/azure/core/polling/async_base_polling.py
new file mode 100644
index 00000000..1a594672
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/polling/async_base_polling.py
@@ -0,0 +1,182 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+from typing import TypeVar, cast, Union
+from ..exceptions import HttpResponseError
+from .base_polling import (
+    _failed,
+    BadStatus,
+    BadResponse,
+    OperationFailed,
+    _SansIOLROBasePolling,
+    _raise_if_bad_http_status_and_method,
+)
+from ._async_poller import AsyncPollingMethod
+from ..pipeline._tools import is_rest
+from .. import AsyncPipelineClient
+from ..pipeline import PipelineResponse
+from ..pipeline.transport import (
+    HttpRequest as LegacyHttpRequest,
+    AsyncHttpTransport,
+    AsyncHttpResponse as LegacyAsyncHttpResponse,
+)
+from ..rest import HttpRequest, AsyncHttpResponse
+
+HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
+AsyncHttpResponseType = Union[LegacyAsyncHttpResponse, AsyncHttpResponse]
+HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
+AsyncHttpResponseTypeVar = TypeVar("AsyncHttpResponseTypeVar", bound=AsyncHttpResponseType)
+
+
+PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
+
+__all__ = ["AsyncLROBasePolling"]
+
+
+class AsyncLROBasePolling(
+    _SansIOLROBasePolling[
+        PollingReturnType_co,
+        AsyncPipelineClient[HttpRequestTypeVar, AsyncHttpResponseTypeVar],
+        HttpRequestTypeVar,
+        AsyncHttpResponseTypeVar,
+    ],
+    AsyncPollingMethod[PollingReturnType_co],
+):
+    """A base LRO async poller.
+
+    This assumes a basic flow:
+    - I analyze the response to decide the polling approach
+    - I poll
+    - I ask the final resource depending of the polling approach
+
+    If your polling need are more specific, you could implement a PollingMethod directly
+    """
+
+    _initial_response: PipelineResponse[HttpRequestTypeVar, AsyncHttpResponseTypeVar]
+    """Store the initial response."""
+
+    _pipeline_response: PipelineResponse[HttpRequestTypeVar, AsyncHttpResponseTypeVar]
+    """Store the latest received HTTP response, initialized by the first answer."""
+
+    @property
+    def _transport(
+        self,
+    ) -> AsyncHttpTransport[HttpRequestTypeVar, AsyncHttpResponseTypeVar]:
+        return self._client._pipeline._transport  # pylint: disable=protected-access
+
+    async def run(self) -> None:
+        try:
+            await self._poll()
+
+        except BadStatus as err:
+            self._status = "Failed"
+            raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
+
+        except BadResponse as err:
+            self._status = "Failed"
+            raise HttpResponseError(
+                response=self._pipeline_response.http_response,
+                message=str(err),
+                error=err,
+            ) from err
+
+        except OperationFailed as err:
+            raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
+
+    async def _poll(self) -> None:
+        """Poll status of operation so long as operation is incomplete and
+        we have an endpoint to query.
+
+        :raises: OperationFailed if operation status 'Failed' or 'Canceled'.
+        :raises: BadStatus if response status invalid.
+        :raises: BadResponse if response invalid.
+        """
+        if not self.finished():
+            await self.update_status()
+        while not self.finished():
+            await self._delay()
+            await self.update_status()
+
+        if _failed(self.status()):
+            raise OperationFailed("Operation failed or canceled")
+
+        final_get_url = self._operation.get_final_get_url(self._pipeline_response)
+        if final_get_url:
+            self._pipeline_response = await self.request_status(final_get_url)
+            _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
+
+    async def _sleep(self, delay: float) -> None:
+        await self._transport.sleep(delay)
+
+    async def _delay(self) -> None:
+        """Check for a 'retry-after' header to set timeout,
+        otherwise use configured timeout.
+        """
+        delay = self._extract_delay()
+        await self._sleep(delay)
+
+    async def update_status(self) -> None:
+        """Update the current status of the LRO."""
+        self._pipeline_response = await self.request_status(self._operation.get_polling_url())
+        _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
+        self._status = self._operation.get_status(self._pipeline_response)
+
+    async def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, AsyncHttpResponseTypeVar]:
+        """Do a simple GET to this status link.
+
+        This method re-inject 'x-ms-client-request-id'.
+
+        :param str status_link: URL to poll.
+        :rtype: azure.core.pipeline.PipelineResponse
+        :return: The response of the status request.
+        """
+        if self._path_format_arguments:
+            status_link = self._client.format_url(status_link, **self._path_format_arguments)
+        # Re-inject 'x-ms-client-request-id' while polling
+        if "request_id" not in self._operation_config:
+            self._operation_config["request_id"] = self._get_request_id()
+
+        if is_rest(self._initial_response.http_response):
+            rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link))
+            # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not
+            # declared in the typing of "send_request"
+            return cast(
+                PipelineResponse[HttpRequestTypeVar, AsyncHttpResponseTypeVar],
+                await self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config),
+            )
+
+        # Legacy HttpRequest and AsyncHttpResponse from azure.core.pipeline.transport
+        # casting things here, as we don't want the typing system to know
+        # about the legacy APIs.
+        request = cast(HttpRequestTypeVar, self._client.get(status_link))
+        return cast(
+            PipelineResponse[HttpRequestTypeVar, AsyncHttpResponseTypeVar],
+            await self._client._pipeline.run(  # pylint: disable=protected-access
+                request, stream=False, **self._operation_config
+            ),
+        )
+
+
+__all__ = ["AsyncLROBasePolling"]
diff --git a/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py b/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py
new file mode 100644
index 00000000..91a12a84
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/polling/base_polling.py
@@ -0,0 +1,888 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import abc
+import base64
+import json
+from enum import Enum
+from typing import (
+    Optional,
+    Any,
+    Tuple,
+    Callable,
+    Dict,
+    Sequence,
+    Generic,
+    TypeVar,
+    cast,
+    Union,
+)
+
+from ..exceptions import HttpResponseError, DecodeError
+from . import PollingMethod
+from ..pipeline.policies._utils import get_retry_after
+from ..pipeline._tools import is_rest
+from .._enum_meta import CaseInsensitiveEnumMeta
+from .. import PipelineClient
+from ..pipeline import PipelineResponse
+from ..pipeline.transport import (
+    HttpTransport,
+    HttpRequest as LegacyHttpRequest,
+    HttpResponse as LegacyHttpResponse,
+    AsyncHttpResponse as LegacyAsyncHttpResponse,
+)
+from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse
+
+
+HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
+HttpResponseType = Union[LegacyHttpResponse, HttpResponse]  # Sync only
+AllHttpResponseType = Union[
+    LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
+]  # Sync or async
+LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse]
+NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse]
+PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType]
+HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
+HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType)  # Sync only
+AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType)  # Sync or async
+
+ABC = abc.ABC
+PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
+PipelineClientType = TypeVar("PipelineClientType")
+HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True)
+HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True)
+
+
+_FINISHED = frozenset(["succeeded", "canceled", "failed"])
+_FAILED = frozenset(["canceled", "failed"])
+_SUCCEEDED = frozenset(["succeeded"])
+
+
+def _get_content(response: AllHttpResponseType) -> bytes:
+    """Get the content of this response. This is designed specifically to avoid
+    a warning of mypy for body() access, as this method is deprecated.
+
+    :param response: The response object.
+    :type response: any
+    :return: The content of this response.
+    :rtype: bytes
+    """
+    if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)):
+        return response.body()
+    return response.content
+
+
+def _finished(status):
+    if hasattr(status, "value"):
+        status = status.value
+    return str(status).lower() in _FINISHED
+
+
+def _failed(status):
+    if hasattr(status, "value"):
+        status = status.value
+    return str(status).lower() in _FAILED
+
+
+def _succeeded(status):
+    if hasattr(status, "value"):
+        status = status.value
+    return str(status).lower() in _SUCCEEDED
+
+
+class BadStatus(Exception):
+    pass
+
+
+class BadResponse(Exception):
+    pass
+
+
+class OperationFailed(Exception):
+    pass
+
+
+def _as_json(response: AllHttpResponseType) -> Dict[str, Any]:
+    """Assuming this is not empty, return the content as JSON.
+
+    Result/exceptions is not determined if you call this method without testing _is_empty.
+
+    :param response: The response object.
+    :type response: any
+    :return: The content of this response as dict.
+    :rtype: dict
+    :raises: DecodeError if response body contains invalid json data.
+    """
+    try:
+        return json.loads(response.text())
+    except ValueError as err:
+        raise DecodeError("Error occurred in deserializing the response body.") from err
+
+
+def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None:
+    """Check response status code is valid.
+
+    Must be 200, 201, 202, or 204.
+
+    :param response: The response object.
+    :type response: any
+    :raises: BadStatus if invalid status.
+    """
+    code = response.status_code
+    if code in {200, 201, 202, 204}:
+        return
+    raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method))
+
+
+def _is_empty(response: AllHttpResponseType) -> bool:
+    """Check if response body contains meaningful content.
+
+    :param response: The response object.
+    :type response: any
+    :return: True if response body is empty, False otherwise.
+    :rtype: bool
+    """
+    return not bool(_get_content(response))
+
+
+class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]):
+    """Protocol to implement for a long running operation algorithm."""
+
+    @abc.abstractmethod
+    def can_poll(
+        self,
+        pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+    ) -> bool:
+        """Answer if this polling method could be used.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: True if this polling method could be used, False otherwise.
+        :rtype: bool
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_polling_url(self) -> str:
+        """Return the polling URL.
+
+        :return: The polling URL.
+        :rtype: str
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def set_initial_status(
+        self,
+        pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+    ) -> str:
+        """Process first response after initiating long running operation.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The initial status.
+        :rtype: str
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_status(
+        self,
+        pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+    ) -> str:
+        """Return the status string extracted from this response.
+
+        :param pipeline_response: The response object.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The status string.
+        :rtype: str
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_final_get_url(
+        self,
+        pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
+    ) -> Optional[str]:
+        """If a final GET is needed, returns the URL.
+
+        :param pipeline_response: Success REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The URL to the final GET, or None if no final GET is needed.
+        :rtype: str or None
+        """
+        raise NotImplementedError()
+
+
+class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
+    """Known LRO options from Swagger."""
+
+    FINAL_STATE_VIA = "final-state-via"
+
+
+class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
+    """Possible final-state-via options."""
+
+    AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
+    LOCATION_FINAL_STATE = "location"
+    OPERATION_LOCATION_FINAL_STATE = "operation-location"
+
+
+class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
+    """Implements a operation resource polling, typically from Operation-Location.
+
+    :param str operation_location_header: Name of the header to return operation format (default 'operation-location')
+    :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see
+     https://aka.ms/azsdk/autorest/openapi/lro-options
+    """
+
+    _async_url: str
+    """Url to resource monitor (AzureAsyncOperation or Operation-Location)"""
+
+    _location_url: Optional[str]
+    """Location header if present"""
+
+    _request: Any
+    """The initial request done"""
+
+    def __init__(
+        self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None
+    ):
+        self._operation_location_header = operation_location_header
+        self._location_url = None
+        self._lro_options = lro_options or {}
+
+    def can_poll(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> bool:
+        """Check if status monitor header (e.g. Operation-Location) is present.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: True if this polling method could be used, False otherwise.
+        :rtype: bool
+        """
+        response = pipeline_response.http_response
+        return self._operation_location_header in response.headers
+
+    def get_polling_url(self) -> str:
+        """Return the polling URL.
+
+        Will extract it from the defined header to read (e.g. Operation-Location)
+
+        :return: The polling URL.
+        :rtype: str
+        """
+        return self._async_url
+
+    def get_final_get_url(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> Optional[str]:
+        """If a final GET is needed, returns the URL.
+
+        :param pipeline_response: Success REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The URL to the final GET, or None if no final GET is needed.
+        :rtype: str or None
+        """
+        if (
+            self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE
+            and self._location_url
+        ):
+            return self._location_url
+        if (
+            self._lro_options.get(_LroOption.FINAL_STATE_VIA)
+            in [
+                _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE,
+                _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE,
+            ]
+            and self._request.method == "POST"
+        ):
+            return None
+        response = pipeline_response.http_response
+        if not _is_empty(response):
+            body = _as_json(response)
+            # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location
+            resource_location = body.get("resourceLocation")
+            if resource_location:
+                return resource_location
+
+        if self._request.method in {"PUT", "PATCH"}:
+            return self._request.url
+
+        if self._request.method == "POST" and self._location_url:
+            return self._location_url
+
+        return None
+
+    def set_initial_status(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> str:
+        """Process first response after initiating long running operation.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The initial status.
+        :rtype: str
+        """
+        self._request = pipeline_response.http_response.request
+        response = pipeline_response.http_response
+
+        self._set_async_url_if_present(response)
+
+        if response.status_code in {200, 201, 202, 204} and self._async_url:
+            # Check if we can extract status from initial response, if present
+            try:
+                return self.get_status(pipeline_response)
+            # Wide catch, it may not even be JSON at all, deserialization is lenient
+            except Exception:  # pylint: disable=broad-except
+                pass
+            return "InProgress"
+        raise OperationFailed("Operation failed or canceled")
+
+    def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None:
+        self._async_url = response.headers[self._operation_location_header]
+
+        location_url = response.headers.get("location")
+        if location_url:
+            self._location_url = location_url
+
+    def get_status(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> str:
+        """Process the latest status update retrieved from an "Operation-Location" header.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The status string.
+        :rtype: str
+        :raises: BadResponse if response has no body, or body does not contain status.
+        """
+        response = pipeline_response.http_response
+        if _is_empty(response):
+            raise BadResponse("The response from long running operation does not contain a body.")
+
+        body = _as_json(response)
+        status = body.get("status")
+        if not status:
+            raise BadResponse("No status found in body")
+        return status
+
+
+class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
+    """Implements a Location polling."""
+
+    _location_url: str
+    """Location header"""
+
+    def can_poll(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> bool:
+        """True if contains a Location header
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: True if this polling method could be used, False otherwise.
+        :rtype: bool
+        """
+        response = pipeline_response.http_response
+        return "location" in response.headers
+
+    def get_polling_url(self) -> str:
+        """Return the Location header value.
+
+        :return: The polling URL.
+        :rtype: str
+        """
+        return self._location_url
+
+    def get_final_get_url(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> Optional[str]:
+        """If a final GET is needed, returns the URL.
+
+        Always return None for a Location polling.
+
+        :param pipeline_response: Success REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: Always None for this implementation.
+        :rtype: None
+        """
+        return None
+
+    def set_initial_status(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> str:
+        """Process first response after initiating long running operation.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The initial status.
+        :rtype: str
+        """
+        response = pipeline_response.http_response
+
+        self._location_url = response.headers["location"]
+
+        if response.status_code in {200, 201, 202, 204} and self._location_url:
+            return "InProgress"
+        raise OperationFailed("Operation failed or canceled")
+
+    def get_status(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> str:
+        """Return the status string extracted from this response.
+
+        For Location polling, it means the status monitor returns 202.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The status string.
+        :rtype: str
+        """
+        response = pipeline_response.http_response
+        if "location" in response.headers:
+            self._location_url = response.headers["location"]
+
+        return "InProgress" if response.status_code == 202 else "Succeeded"
+
+
+class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
+    """Should be the fallback polling, that don't poll but exit successfully
+    if not other polling are detected and status code is 2xx.
+    """
+
+    def can_poll(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> bool:
+        """Answer if this polling method could be used.
+
+        For this implementation, always True.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: True if this polling method could be used, False otherwise.
+        :rtype: bool
+        """
+        return True
+
+    def get_polling_url(self) -> str:
+        """Return the polling URL.
+
+        This is not implemented for this polling, since we're never supposed to loop.
+
+        :return: The polling URL.
+        :rtype: str
+        """
+        raise ValueError("This polling doesn't support polling url")
+
+    def set_initial_status(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> str:
+        """Process first response after initiating long running operation.
+
+        Will succeed immediately.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The initial status.
+        :rtype: str
+        """
+        return "Succeeded"
+
+    def get_status(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> str:
+        """Return the status string extracted from this response.
+
+        Only possible status is success.
+
+        :param pipeline_response: Initial REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The status string.
+        :rtype: str
+        """
+        return "Succeeded"
+
+    def get_final_get_url(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> Optional[str]:
+        """If a final GET is needed, returns the URL.
+
+        :param pipeline_response: Success REST call response.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :rtype: str
+        :return: Always None for this implementation.
+        """
+        return None
+
+
+class _SansIOLROBasePolling(
+    Generic[
+        PollingReturnType_co,
+        PipelineClientType,
+        HttpRequestTypeVar,
+        AllHttpResponseTypeVar,
+    ]
+):  # pylint: disable=too-many-instance-attributes
+    """A base class that has no opinion on IO, to help mypy be accurate.
+
+    :param float timeout: Default polling internal in absence of Retry-After header, in seconds.
+    :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use.
+    :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring.
+    :type lro_options: dict[str, any]
+    :param path_format_arguments: A dictionary of the format arguments to be used to format the URL.
+    :type path_format_arguments: dict[str, str]
+    """
+
+    _deserialization_callback: Callable[[Any], PollingReturnType_co]
+    """The deserialization callback that returns the final instance."""
+
+    _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]
+    """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide."""
+
+    _status: str
+    """Hold the current status of this poller"""
+
+    _client: PipelineClientType
+    """The Azure Core Pipeline client used to make request."""
+
+    def __init__(
+        self,
+        timeout: float = 30,
+        lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None,
+        lro_options: Optional[Dict[str, Any]] = None,
+        path_format_arguments: Optional[Dict[str, str]] = None,
+        **operation_config: Any
+    ):
+        self._lro_algorithms = lro_algorithms or [
+            OperationResourcePolling(lro_options=lro_options),
+            LocationPolling(),
+            StatusCheckPolling(),
+        ]
+
+        self._timeout = timeout
+        self._operation_config = operation_config
+        self._lro_options = lro_options
+        self._path_format_arguments = path_format_arguments
+
+    def initialize(
+        self,
+        client: PipelineClientType,
+        initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+        deserialization_callback: Callable[
+            [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]],
+            PollingReturnType_co,
+        ],
+    ) -> None:
+        """Set the initial status of this LRO.
+
+        :param client: The Azure Core Pipeline client used to make request.
+        :type client: ~azure.core.pipeline.PipelineClient
+        :param initial_response: The initial response for the call.
+        :type initial_response: ~azure.core.pipeline.PipelineResponse
+        :param deserialization_callback: A callback function to deserialize the final response.
+        :type deserialization_callback: callable
+        :raises: HttpResponseError if initial status is incorrect LRO state
+        """
+        self._client = client
+        self._pipeline_response = (  # pylint: disable=attribute-defined-outside-init
+            self._initial_response  # pylint: disable=attribute-defined-outside-init
+        ) = initial_response
+        self._deserialization_callback = deserialization_callback
+
+        for operation in self._lro_algorithms:
+            if operation.can_poll(initial_response):
+                self._operation = operation
+                break
+        else:
+            raise BadResponse("Unable to find status link for polling.")
+
+        try:
+            _raise_if_bad_http_status_and_method(self._initial_response.http_response)
+            self._status = self._operation.set_initial_status(initial_response)
+
+        except BadStatus as err:
+            self._status = "Failed"
+            raise HttpResponseError(response=initial_response.http_response, error=err) from err
+        except BadResponse as err:
+            self._status = "Failed"
+            raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err
+        except OperationFailed as err:
+            raise HttpResponseError(response=initial_response.http_response, error=err) from err
+
+    def get_continuation_token(self) -> str:
+        import pickle
+
+        return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii")
+
+    @classmethod
+    def from_continuation_token(
+        cls, continuation_token: str, **kwargs: Any
+    ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]:
+        try:
+            client = kwargs["client"]
+        except KeyError:
+            raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None
+
+        try:
+            deserialization_callback = kwargs["deserialization_callback"]
+        except KeyError:
+            raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None
+
+        import pickle
+
+        initial_response = pickle.loads(base64.b64decode(continuation_token))  # nosec
+        # Restore the transport in the context
+        initial_response.context.transport = client._pipeline._transport  # pylint: disable=protected-access
+        return client, initial_response, deserialization_callback
+
+    def status(self) -> str:
+        """Return the current status as a string.
+
+        :rtype: str
+        :return: The current status.
+        """
+        if not self._operation:
+            raise ValueError("set_initial_status was never called. Did you give this instance to a poller?")
+        return self._status
+
+    def finished(self) -> bool:
+        """Is this polling finished?
+
+        :rtype: bool
+        :return: True if finished, False otherwise.
+        """
+        return _finished(self.status())
+
+    def resource(self) -> PollingReturnType_co:
+        """Return the built resource.
+
+        :rtype: any
+        :return: The built resource.
+        """
+        return self._parse_resource(self._pipeline_response)
+
+    def _parse_resource(
+        self,
+        pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
+    ) -> PollingReturnType_co:
+        """Assuming this response is a resource, use the deserialization callback to parse it.
+        If body is empty, assuming no resource to return.
+
+        :param pipeline_response: The response object.
+        :type pipeline_response: ~azure.core.pipeline.PipelineResponse
+        :return: The parsed resource.
+        :rtype: any
+        """
+        response = pipeline_response.http_response
+        if not _is_empty(response):
+            return self._deserialization_callback(pipeline_response)
+
+        # This "type ignore" has been discussed with architects.
+        # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None),
+        # BUT the returned payload is actually empty, we don't want to fail, but return None.
+        # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong".
+        # This is reducing the quality and the value of the typing annotations
+        # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here.
+        return None  # type: ignore
+
+    def _get_request_id(self) -> str:
+        return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"]
+
+    def _extract_delay(self) -> float:
+        delay = get_retry_after(self._pipeline_response)
+        if delay:
+            return delay
+        return self._timeout
+
+
+class LROBasePolling(
+    _SansIOLROBasePolling[
+        PollingReturnType_co,
+        PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar],
+        HttpRequestTypeVar,
+        HttpResponseTypeVar,
+    ],
+    PollingMethod[PollingReturnType_co],
+):
+    """A base LRO poller.
+
+    This assumes a basic flow:
+    - I analyze the response to decide the polling approach
+    - I poll
+    - I ask the final resource depending of the polling approach
+
+    If your polling need are more specific, you could implement a PollingMethod directly
+    """
+
+    _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
+    """Store the initial response."""
+
+    _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
+    """Store the latest received HTTP response, initialized by the first answer."""
+
+    @property
+    def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]:
+        return self._client._pipeline._transport  # pylint: disable=protected-access
+
+    def __getattribute__(self, name: str) -> Any:
+        """Find the right method for the job.
+
+        This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO
+        is changing when azure-core was refactored in 1.27.0. The MRO change was causing
+        AsyncARMPolling to look-up the wrong methods and find the non-async ones.
+
+        :param str name: The name of the attribute to retrieve.
+        :rtype: Any
+        :return: The attribute value.
+        """
+        cls = object.__getattribute__(self, "__class__")
+        if cls.__name__ == "AsyncARMPolling" and name in [
+            "run",
+            "update_status",
+            "request_status",
+            "_sleep",
+            "_delay",
+            "_poll",
+        ]:
+            return getattr(super(LROBasePolling, self), name)
+        return super().__getattribute__(name)
+
+    def run(self) -> None:
+        try:
+            self._poll()
+
+        except BadStatus as err:
+            self._status = "Failed"
+            raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
+
+        except BadResponse as err:
+            self._status = "Failed"
+            raise HttpResponseError(
+                response=self._pipeline_response.http_response,
+                message=str(err),
+                error=err,
+            ) from err
+
+        except OperationFailed as err:
+            raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
+
+    def _poll(self) -> None:
+        """Poll status of operation so long as operation is incomplete and
+        we have an endpoint to query.
+
+        :raises: OperationFailed if operation status 'Failed' or 'Canceled'.
+        :raises: BadStatus if response status invalid.
+        :raises: BadResponse if response invalid.
+        """
+        if not self.finished():
+            self.update_status()
+        while not self.finished():
+            self._delay()
+            self.update_status()
+
+        if _failed(self.status()):
+            raise OperationFailed("Operation failed or canceled")
+
+        final_get_url = self._operation.get_final_get_url(self._pipeline_response)
+        if final_get_url:
+            self._pipeline_response = self.request_status(final_get_url)
+            _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
+
+    def _sleep(self, delay: float) -> None:
+        self._transport.sleep(delay)
+
+    def _delay(self) -> None:
+        """Check for a 'retry-after' header to set timeout,
+        otherwise use configured timeout.
+        """
+        delay = self._extract_delay()
+        self._sleep(delay)
+
+    def update_status(self) -> None:
+        """Update the current status of the LRO."""
+        self._pipeline_response = self.request_status(self._operation.get_polling_url())
+        _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
+        self._status = self._operation.get_status(self._pipeline_response)
+
+    def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]:
+        """Do a simple GET to this status link.
+
+        This method re-inject 'x-ms-client-request-id'.
+
+        :param str status_link: The URL to poll.
+        :rtype: azure.core.pipeline.PipelineResponse
+        :return: The response of the status request.
+        """
+        if self._path_format_arguments:
+            status_link = self._client.format_url(status_link, **self._path_format_arguments)
+        # Re-inject 'x-ms-client-request-id' while polling
+        if "request_id" not in self._operation_config:
+            self._operation_config["request_id"] = self._get_request_id()
+
+        if is_rest(self._initial_response.http_response):
+            rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link))
+            # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not
+            # declared in the typing of "send_request"
+            return cast(
+                PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
+                self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config),
+            )
+
+        # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport
+        # casting things here, as we don't want the typing system to know
+        # about the legacy APIs.
+        request = cast(HttpRequestTypeVar, self._client.get(status_link))
+        return cast(
+            PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
+            self._client._pipeline.run(  # pylint: disable=protected-access
+                request, stream=False, **self._operation_config
+            ),
+        )
+
+
+__all__ = [
+    "BadResponse",
+    "BadStatus",
+    "OperationFailed",
+    "LongRunningOperation",
+    "OperationResourcePolling",
+    "LocationPolling",
+    "StatusCheckPolling",
+    "LROBasePolling",
+]