aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/grpc/beta/utilities.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/grpc/beta/utilities.py')
-rw-r--r--.venv/lib/python3.12/site-packages/grpc/beta/utilities.py153
1 files changed, 153 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/grpc/beta/utilities.py b/.venv/lib/python3.12/site-packages/grpc/beta/utilities.py
new file mode 100644
index 00000000..90e54715
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/grpc/beta/utilities.py
@@ -0,0 +1,153 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Utilities for the gRPC Python Beta API."""
+
+import threading
+import time
+
+# implementations is referenced from specification in this module.
+from grpc.beta import implementations # pylint: disable=unused-import
+from grpc.beta import interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import future
+
+_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+ 'Exception calling connectivity future "done" callback!'
+)
+
+
+class _ChannelReadyFuture(future.Future):
+ def __init__(self, channel):
+ self._condition = threading.Condition()
+ self._channel = channel
+
+ self._matured = False
+ self._cancelled = False
+ self._done_callbacks = []
+
+ def _block(self, timeout):
+ until = None if timeout is None else time.time() + timeout
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise future.CancelledError()
+ elif self._matured:
+ return
+ else:
+ if until is None:
+ self._condition.wait()
+ else:
+ remaining = until - time.time()
+ if remaining < 0:
+ raise future.TimeoutError()
+ else:
+ self._condition.wait(timeout=remaining)
+
+ def _update(self, connectivity):
+ with self._condition:
+ if (
+ not self._cancelled
+ and connectivity is interfaces.ChannelConnectivity.READY
+ ):
+ self._matured = True
+ self._channel.unsubscribe(self._update)
+ self._condition.notify_all()
+ done_callbacks = tuple(self._done_callbacks)
+ self._done_callbacks = None
+ else:
+ return
+
+ for done_callback in done_callbacks:
+ callable_util.call_logging_exceptions(
+ done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self
+ )
+
+ def cancel(self):
+ with self._condition:
+ if not self._matured:
+ self._cancelled = True
+ self._channel.unsubscribe(self._update)
+ self._condition.notify_all()
+ done_callbacks = tuple(self._done_callbacks)
+ self._done_callbacks = None
+ else:
+ return False
+
+ for done_callback in done_callbacks:
+ callable_util.call_logging_exceptions(
+ done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self
+ )
+
+ return True
+
+ def cancelled(self):
+ with self._condition:
+ return self._cancelled
+
+ def running(self):
+ with self._condition:
+ return not self._cancelled and not self._matured
+
+ def done(self):
+ with self._condition:
+ return self._cancelled or self._matured
+
+ def result(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def exception(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def traceback(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def add_done_callback(self, fn):
+ with self._condition:
+ if not self._cancelled and not self._matured:
+ self._done_callbacks.append(fn)
+ return
+
+ fn(self)
+
+ def start(self):
+ with self._condition:
+ self._channel.subscribe(self._update, try_to_connect=True)
+
+ def __del__(self):
+ with self._condition:
+ if not self._cancelled and not self._matured:
+ self._channel.unsubscribe(self._update)
+
+
+def channel_ready_future(channel):
+ """Creates a future.Future tracking when an implementations.Channel is ready.
+
+ Cancelling the returned future.Future does not tell the given
+ implementations.Channel to abandon attempts it may have been making to
+ connect; cancelling merely deactivates the return future.Future's
+ subscription to the given implementations.Channel's connectivity.
+
+ Args:
+ channel: An implementations.Channel.
+
+ Returns:
+ A future.Future that matures when the given Channel has connectivity
+ interfaces.ChannelConnectivity.READY.
+ """
+ ready_future = _ChannelReadyFuture(channel)
+ ready_future.start()
+ return ready_future