aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/msrest/polling/poller.py
blob: 2c3e6773abf2a84f208028e07bbfe109368164f5 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
import threading
import time
import uuid
try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

from typing import Any, Callable, Union, List, Optional, TYPE_CHECKING

if TYPE_CHECKING:
    import requests

from ..serialization import Model
from ..service_client import ServiceClient
from ..pipeline import ClientRawResponse

class PollingMethod(object):
    """ABC class for polling method.
    """
    def initialize(self, client, initial_response, deserialization_callback):
        # type: (Any, Any, Any) -> None
        raise NotImplementedError("This method needs to be implemented")

    def run(self):
        # type: () -> None
        raise NotImplementedError("This method needs to be implemented")

    def status(self):
        # type: () -> str
        raise NotImplementedError("This method needs to be implemented")

    def finished(self):
        # type: () -> bool
        raise NotImplementedError("This method needs to be implemented")

    def resource(self):
        # type: () -> Any
        raise NotImplementedError("This method needs to be implemented")

class NoPolling(PollingMethod):
    """An empty poller that returns the deserialized initial response.
    """
    def __init__(self):
        self._initial_response = None
        self._deserialization_callback = None

    def initialize(self, _, initial_response, deserialization_callback):
        # type: (Any, requests.Response, Callable) -> None
        self._initial_response = initial_response
        self._deserialization_callback = deserialization_callback

    def run(self):
        # type: () -> None
        """Empty run, no polling.
        """
        pass

    def status(self):
        # type: () -> str
        """Return the current status as a string.
        :rtype: str
        """
        return "succeeded"

    def finished(self):
        # type: () -> bool
        """Is this polling finished?
        :rtype: bool
        """
        return True

    def resource(self):
        # type: () -> Any
        return self._deserialization_callback(self._initial_response)


class LROPoller(object):
    """Poller for long running operations.

    :param client: A msrest service client. Can be a SDK client and it will be casted to a ServiceClient.
    :type client: msrest.service_client.ServiceClient
    :param initial_response: The initial call response
    :type initial_response: requests.Response or msrest.pipeline.ClientRawResponse
    :param deserialization_callback: A callback that takes a Response and return a deserialized object. If a subclass of Model is given, this passes "deserialize" as callback.
    :type deserialization_callback: callable or msrest.serialization.Model
    :param polling_method: The polling strategy to adopt
    :type polling_method: msrest.polling.PollingMethod
    """

    def __init__(self, client, initial_response, deserialization_callback, polling_method):
        # type: (Any, Union[ClientRawResponse, requests.Response], Union[Model, Callable[[requests.Response], Model]], PollingMethod) -> None
        try:
            self._client = client if isinstance(client, ServiceClient) else client._client  #  type: ServiceClient
        except AttributeError:
            raise ValueError("Poller client parameter must be a low-level msrest Service Client or a SDK client.")
        self._response = initial_response.response if isinstance(initial_response, ClientRawResponse) else initial_response
        self._callbacks = []  # type: List[Callable]
        self._polling_method = polling_method

        if isinstance(deserialization_callback, type) and issubclass(deserialization_callback, Model):
            deserialization_callback = deserialization_callback.deserialize  # type: ignore

        # Might raise a CloudError
        self._polling_method.initialize(self._client, self._response, deserialization_callback)

        # Prepare thread execution
        self._thread = None
        self._done = None
        self._exception = None
        if not self._polling_method.finished():
            self._done = threading.Event()
            self._thread = threading.Thread(
                target=self._start,
                name="LROPoller({})".format(uuid.uuid4()))
            self._thread.daemon = True
            self._thread.start()

    def _start(self):
        """Start the long running operation.
        On completion, runs any callbacks.

        :param callable update_cmd: The API request to check the status of
         the operation.
        """
        try:
            self._polling_method.run()
        except Exception as err:
            self._exception = err

        finally:
            self._done.set()

        callbacks, self._callbacks = self._callbacks, []
        while callbacks:
            for call in callbacks:
                call(self._polling_method)
            callbacks, self._callbacks = self._callbacks, []

    def status(self):
        # type: () -> str
        """Returns the current status string.

        :returns: The current status string
        :rtype: str
        """
        return self._polling_method.status()

    def result(self, timeout=None):
        # type: (Optional[float]) -> Model
        """Return the result of the long running operation, or
        the result available after the specified timeout.

        :returns: The deserialized resource of the long running operation,
         if one is available.
        :raises CloudError: Server problem with the query.
        """
        self.wait(timeout)
        return self._polling_method.resource()

    def wait(self, timeout=None):
        # type: (Optional[float]) -> None
        """Wait on the long running operation for a specified length
        of time. You can check if this call as ended with timeout with the
        "done()" method.

        :param float timeout: Period of time to wait for the long running
         operation to complete (in seconds).
        :raises CloudError: Server problem with the query.
        """
        if self._thread is None:
            return
        self._thread.join(timeout=timeout)
        try:
            # Let's handle possible None in forgiveness here
            raise self._exception  # type: ignore
        except TypeError: # Was None
            pass

    def done(self):
        # type: () -> bool
        """Check status of the long running operation.

        :returns: 'True' if the process has completed, else 'False'.
        """
        return self._thread is None or not self._thread.is_alive()

    def add_done_callback(self, func):
        # type: (Callable) -> None
        """Add callback function to be run once the long running operation
        has completed - regardless of the status of the operation.

        :param callable func: Callback function that takes at least one
         argument, a completed LongRunningOperation.
        """
        # Still use "_done" and not "done", since CBs are executed inside the thread.
        if self._done is None or self._done.is_set():
            func(self._polling_method)
        # Let's add them still, for consistency (if you wish to access to it for some reasons)
        self._callbacks.append(func)

    def remove_done_callback(self, func):
        # type: (Callable) -> None
        """Remove a callback from the long running operation.

        :param callable func: The function to be removed from the callbacks.
        :raises: ValueError if the long running operation has already
         completed.
        """
        if self._done is None or self._done.is_set():
            raise ValueError("Process is complete.")
        self._callbacks = [c for c in self._callbacks if c != func]