about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/grpc/_channel.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/grpc/_channel.py')
-rw-r--r--.venv/lib/python3.12/site-packages/grpc/_channel.py2267
1 files changed, 2267 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/grpc/_channel.py b/.venv/lib/python3.12/site-packages/grpc/_channel.py
new file mode 100644
index 00000000..bf29982c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/grpc/_channel.py
@@ -0,0 +1,2267 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Invocation-side implementation of gRPC Python."""
+
+import copy
+import functools
+import logging
+import os
+import sys
+import threading
+import time
+import types
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterator,
+    List,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+    Union,
+)
+
+import grpc  # pytype: disable=pyi-error
+from grpc import _common  # pytype: disable=pyi-error
+from grpc import _compression  # pytype: disable=pyi-error
+from grpc import _grpcio_metadata  # pytype: disable=pyi-error
+from grpc import _observability  # pytype: disable=pyi-error
+from grpc._cython import cygrpc
+from grpc._typing import ChannelArgumentType
+from grpc._typing import DeserializingFunction
+from grpc._typing import IntegratedCallFactory
+from grpc._typing import MetadataType
+from grpc._typing import NullaryCallbackType
+from grpc._typing import ResponseType
+from grpc._typing import SerializingFunction
+from grpc._typing import UserTag
+import grpc.experimental  # pytype: disable=pyi-error
+
+_LOGGER = logging.getLogger(__name__)
+
+_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
+
+_EMPTY_FLAGS = 0
+
+# NOTE(rbellevi): No guarantees are given about the maintenance of this
+# environment variable.
+_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
+    os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
+)
+
+_UNARY_UNARY_INITIAL_DUE = (
+    cygrpc.OperationType.send_initial_metadata,
+    cygrpc.OperationType.send_message,
+    cygrpc.OperationType.send_close_from_client,
+    cygrpc.OperationType.receive_initial_metadata,
+    cygrpc.OperationType.receive_message,
+    cygrpc.OperationType.receive_status_on_client,
+)
+_UNARY_STREAM_INITIAL_DUE = (
+    cygrpc.OperationType.send_initial_metadata,
+    cygrpc.OperationType.send_message,
+    cygrpc.OperationType.send_close_from_client,
+    cygrpc.OperationType.receive_initial_metadata,
+    cygrpc.OperationType.receive_status_on_client,
+)
+_STREAM_UNARY_INITIAL_DUE = (
+    cygrpc.OperationType.send_initial_metadata,
+    cygrpc.OperationType.receive_initial_metadata,
+    cygrpc.OperationType.receive_message,
+    cygrpc.OperationType.receive_status_on_client,
+)
+_STREAM_STREAM_INITIAL_DUE = (
+    cygrpc.OperationType.send_initial_metadata,
+    cygrpc.OperationType.receive_initial_metadata,
+    cygrpc.OperationType.receive_status_on_client,
+)
+
+_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
+    "Exception calling channel subscription callback!"
+)
+
+_OK_RENDEZVOUS_REPR_FORMAT = (
+    '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
+)
+
+_NON_OK_RENDEZVOUS_REPR_FORMAT = (
+    "<{} of RPC that terminated with:\n"
+    "\tstatus = {}\n"
+    '\tdetails = "{}"\n'
+    '\tdebug_error_string = "{}"\n'
+    ">"
+)
+
+
+def _deadline(timeout: Optional[float]) -> Optional[float]:
+    return None if timeout is None else time.time() + timeout
+
+
+def _unknown_code_details(
+    unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
+) -> str:
+    return 'Server sent unknown code {} and details "{}"'.format(
+        unknown_cygrpc_code, details
+    )
+
+
+class _RPCState(object):
+    condition: threading.Condition
+    due: Set[cygrpc.OperationType]
+    initial_metadata: Optional[MetadataType]
+    response: Any
+    trailing_metadata: Optional[MetadataType]
+    code: Optional[grpc.StatusCode]
+    details: Optional[str]
+    debug_error_string: Optional[str]
+    cancelled: bool
+    callbacks: List[NullaryCallbackType]
+    fork_epoch: Optional[int]
+    rpc_start_time: Optional[float]  # In relative seconds
+    rpc_end_time: Optional[float]  # In relative seconds
+    method: Optional[str]
+    target: Optional[str]
+
+    def __init__(
+        self,
+        due: Sequence[cygrpc.OperationType],
+        initial_metadata: Optional[MetadataType],
+        trailing_metadata: Optional[MetadataType],
+        code: Optional[grpc.StatusCode],
+        details: Optional[str],
+    ):
+        # `condition` guards all members of _RPCState. `notify_all` is called on
+        # `condition` when the state of the RPC has changed.
+        self.condition = threading.Condition()
+
+        # The cygrpc.OperationType objects representing events due from the RPC's
+        # completion queue. If an operation is in `due`, it is guaranteed that
+        # `operate()` has been called on a corresponding operation. But the
+        # converse is not true. That is, in the case of failed `operate()`
+        # calls, there may briefly be events in `due` that do not correspond to
+        # operations submitted to Core.
+        self.due = set(due)
+        self.initial_metadata = initial_metadata
+        self.response = None
+        self.trailing_metadata = trailing_metadata
+        self.code = code
+        self.details = details
+        self.debug_error_string = None
+        # The following three fields are used for observability.
+        # Updates to those fields do not trigger self.condition.
+        self.rpc_start_time = None
+        self.rpc_end_time = None
+        self.method = None
+        self.target = None
+
+        # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
+        # slightly wonky, so they have to be tracked separately from the rest of the
+        # result of the RPC. This field tracks whether cancellation was requested
+        # prior to termination of the RPC.
+        self.cancelled = False
+        self.callbacks = []
+        self.fork_epoch = cygrpc.get_fork_epoch()
+
+    def reset_postfork_child(self):
+        self.condition = threading.Condition()
+
+
+def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
+    if state.code is None:
+        state.code = code
+        state.details = details
+        if state.initial_metadata is None:
+            state.initial_metadata = ()
+        state.trailing_metadata = ()
+
+
+def _handle_event(
+    event: cygrpc.BaseEvent,
+    state: _RPCState,
+    response_deserializer: Optional[DeserializingFunction],
+) -> List[NullaryCallbackType]:
+    callbacks = []
+    for batch_operation in event.batch_operations:
+        operation_type = batch_operation.type()
+        state.due.remove(operation_type)
+        if operation_type == cygrpc.OperationType.receive_initial_metadata:
+            state.initial_metadata = batch_operation.initial_metadata()
+        elif operation_type == cygrpc.OperationType.receive_message:
+            serialized_response = batch_operation.message()
+            if serialized_response is not None:
+                response = _common.deserialize(
+                    serialized_response, response_deserializer
+                )
+                if response is None:
+                    details = "Exception deserializing response!"
+                    _abort(state, grpc.StatusCode.INTERNAL, details)
+                else:
+                    state.response = response
+        elif operation_type == cygrpc.OperationType.receive_status_on_client:
+            state.trailing_metadata = batch_operation.trailing_metadata()
+            if state.code is None:
+                code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
+                    batch_operation.code()
+                )
+                if code is None:
+                    state.code = grpc.StatusCode.UNKNOWN
+                    state.details = _unknown_code_details(
+                        code, batch_operation.details()
+                    )
+                else:
+                    state.code = code
+                    state.details = batch_operation.details()
+                    state.debug_error_string = batch_operation.error_string()
+            state.rpc_end_time = time.perf_counter()
+            _observability.maybe_record_rpc_latency(state)
+            callbacks.extend(state.callbacks)
+            state.callbacks = None
+    return callbacks
+
+
+def _event_handler(
+    state: _RPCState, response_deserializer: Optional[DeserializingFunction]
+) -> UserTag:
+    def handle_event(event):
+        with state.condition:
+            callbacks = _handle_event(event, state, response_deserializer)
+            state.condition.notify_all()
+            done = not state.due
+        for callback in callbacks:
+            try:
+                callback()
+            except Exception as e:  # pylint: disable=broad-except
+                # NOTE(rbellevi): We suppress but log errors here so as not to
+                # kill the channel spin thread.
+                logging.error(
+                    "Exception in callback %s: %s", repr(callback.func), repr(e)
+                )
+        return done and state.fork_epoch >= cygrpc.get_fork_epoch()
+
+    return handle_event
+
+
+# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
+# pylint: disable=too-many-statements
+def _consume_request_iterator(
+    request_iterator: Iterator,
+    state: _RPCState,
+    call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
+    request_serializer: SerializingFunction,
+    event_handler: Optional[UserTag],
+) -> None:
+    """Consume a request supplied by the user."""
+
+    def consume_request_iterator():  # pylint: disable=too-many-branches
+        # Iterate over the request iterator until it is exhausted or an error
+        # condition is encountered.
+        while True:
+            return_from_user_request_generator_invoked = False
+            try:
+                # The thread may die in user-code. Do not block fork for this.
+                cygrpc.enter_user_request_generator()
+                request = next(request_iterator)
+            except StopIteration:
+                break
+            except Exception:  # pylint: disable=broad-except
+                cygrpc.return_from_user_request_generator()
+                return_from_user_request_generator_invoked = True
+                code = grpc.StatusCode.UNKNOWN
+                details = "Exception iterating requests!"
+                _LOGGER.exception(details)
+                call.cancel(
+                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
+                )
+                _abort(state, code, details)
+                return
+            finally:
+                if not return_from_user_request_generator_invoked:
+                    cygrpc.return_from_user_request_generator()
+            serialized_request = _common.serialize(request, request_serializer)
+            with state.condition:
+                if state.code is None and not state.cancelled:
+                    if serialized_request is None:
+                        code = grpc.StatusCode.INTERNAL
+                        details = "Exception serializing request!"
+                        call.cancel(
+                            _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+                            details,
+                        )
+                        _abort(state, code, details)
+                        return
+                    else:
+                        state.due.add(cygrpc.OperationType.send_message)
+                        operations = (
+                            cygrpc.SendMessageOperation(
+                                serialized_request, _EMPTY_FLAGS
+                            ),
+                        )
+                        operating = call.operate(operations, event_handler)
+                        if not operating:
+                            state.due.remove(cygrpc.OperationType.send_message)
+                            return
+
+                        def _done():
+                            return (
+                                state.code is not None
+                                or cygrpc.OperationType.send_message
+                                not in state.due
+                            )
+
+                        _common.wait(
+                            state.condition.wait,
+                            _done,
+                            spin_cb=functools.partial(
+                                cygrpc.block_if_fork_in_progress, state
+                            ),
+                        )
+                        if state.code is not None:
+                            return
+                else:
+                    return
+        with state.condition:
+            if state.code is None:
+                state.due.add(cygrpc.OperationType.send_close_from_client)
+                operations = (
+                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                )
+                operating = call.operate(operations, event_handler)
+                if not operating:
+                    state.due.remove(
+                        cygrpc.OperationType.send_close_from_client
+                    )
+
+    consumption_thread = cygrpc.ForkManagedThread(
+        target=consume_request_iterator
+    )
+    consumption_thread.setDaemon(True)
+    consumption_thread.start()
+
+
+def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
+    """Calculates error string for RPC."""
+    with rpc_state.condition:
+        if rpc_state.code is None:
+            return "<{} object>".format(class_name)
+        elif rpc_state.code is grpc.StatusCode.OK:
+            return _OK_RENDEZVOUS_REPR_FORMAT.format(
+                class_name, rpc_state.code, rpc_state.details
+            )
+        else:
+            return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
+                class_name,
+                rpc_state.code,
+                rpc_state.details,
+                rpc_state.debug_error_string,
+            )
+
+
+class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
+    """An RPC error not tied to the execution of a particular RPC.
+
+    The RPC represented by the state object must not be in-progress or
+    cancelled.
+
+    Attributes:
+      _state: An instance of _RPCState.
+    """
+
+    _state: _RPCState
+
+    def __init__(self, state: _RPCState):
+        with state.condition:
+            self._state = _RPCState(
+                (),
+                copy.deepcopy(state.initial_metadata),
+                copy.deepcopy(state.trailing_metadata),
+                state.code,
+                copy.deepcopy(state.details),
+            )
+            self._state.response = copy.copy(state.response)
+            self._state.debug_error_string = copy.copy(state.debug_error_string)
+
+    def initial_metadata(self) -> Optional[MetadataType]:
+        return self._state.initial_metadata
+
+    def trailing_metadata(self) -> Optional[MetadataType]:
+        return self._state.trailing_metadata
+
+    def code(self) -> Optional[grpc.StatusCode]:
+        return self._state.code
+
+    def details(self) -> Optional[str]:
+        return _common.decode(self._state.details)
+
+    def debug_error_string(self) -> Optional[str]:
+        return _common.decode(self._state.debug_error_string)
+
+    def _repr(self) -> str:
+        return _rpc_state_string(self.__class__.__name__, self._state)
+
+    def __repr__(self) -> str:
+        return self._repr()
+
+    def __str__(self) -> str:
+        return self._repr()
+
+    def cancel(self) -> bool:
+        """See grpc.Future.cancel."""
+        return False
+
+    def cancelled(self) -> bool:
+        """See grpc.Future.cancelled."""
+        return False
+
+    def running(self) -> bool:
+        """See grpc.Future.running."""
+        return False
+
+    def done(self) -> bool:
+        """See grpc.Future.done."""
+        return True
+
+    def result(
+        self, timeout: Optional[float] = None
+    ) -> Any:  # pylint: disable=unused-argument
+        """See grpc.Future.result."""
+        raise self
+
+    def exception(
+        self, timeout: Optional[float] = None  # pylint: disable=unused-argument
+    ) -> Optional[Exception]:
+        """See grpc.Future.exception."""
+        return self
+
+    def traceback(
+        self, timeout: Optional[float] = None  # pylint: disable=unused-argument
+    ) -> Optional[types.TracebackType]:
+        """See grpc.Future.traceback."""
+        try:
+            raise self
+        except grpc.RpcError:
+            return sys.exc_info()[2]
+
+    def add_done_callback(
+        self,
+        fn: Callable[[grpc.Future], None],
+        timeout: Optional[float] = None,  # pylint: disable=unused-argument
+    ) -> None:
+        """See grpc.Future.add_done_callback."""
+        fn(self)
+
+
+class _Rendezvous(grpc.RpcError, grpc.RpcContext):
+    """An RPC iterator.
+
+    Attributes:
+      _state: An instance of _RPCState.
+      _call: An instance of SegregatedCall or IntegratedCall.
+        In either case, the _call object is expected to have operate, cancel,
+        and next_event methods.
+      _response_deserializer: A callable taking bytes and return a Python
+        object.
+      _deadline: A float representing the deadline of the RPC in seconds. Or
+        possibly None, to represent an RPC with no deadline at all.
+    """
+
+    _state: _RPCState
+    _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
+    _response_deserializer: Optional[DeserializingFunction]
+    _deadline: Optional[float]
+
+    def __init__(
+        self,
+        state: _RPCState,
+        call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
+        response_deserializer: Optional[DeserializingFunction],
+        deadline: Optional[float],
+    ):
+        super(_Rendezvous, self).__init__()
+        self._state = state
+        self._call = call
+        self._response_deserializer = response_deserializer
+        self._deadline = deadline
+
+    def is_active(self) -> bool:
+        """See grpc.RpcContext.is_active"""
+        with self._state.condition:
+            return self._state.code is None
+
+    def time_remaining(self) -> Optional[float]:
+        """See grpc.RpcContext.time_remaining"""
+        with self._state.condition:
+            if self._deadline is None:
+                return None
+            else:
+                return max(self._deadline - time.time(), 0)
+
+    def cancel(self) -> bool:
+        """See grpc.RpcContext.cancel"""
+        with self._state.condition:
+            if self._state.code is None:
+                code = grpc.StatusCode.CANCELLED
+                details = "Locally cancelled by application!"
+                self._call.cancel(
+                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
+                )
+                self._state.cancelled = True
+                _abort(self._state, code, details)
+                self._state.condition.notify_all()
+                return True
+            else:
+                return False
+
+    def add_callback(self, callback: NullaryCallbackType) -> bool:
+        """See grpc.RpcContext.add_callback"""
+        with self._state.condition:
+            if self._state.callbacks is None:
+                return False
+            else:
+                self._state.callbacks.append(callback)
+                return True
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        return self._next()
+
+    def __next__(self):
+        return self._next()
+
+    def _next(self):
+        raise NotImplementedError()
+
+    def debug_error_string(self) -> Optional[str]:
+        raise NotImplementedError()
+
+    def _repr(self) -> str:
+        return _rpc_state_string(self.__class__.__name__, self._state)
+
+    def __repr__(self) -> str:
+        return self._repr()
+
+    def __str__(self) -> str:
+        return self._repr()
+
+    def __del__(self) -> None:
+        with self._state.condition:
+            if self._state.code is None:
+                self._state.code = grpc.StatusCode.CANCELLED
+                self._state.details = "Cancelled upon garbage collection!"
+                self._state.cancelled = True
+                self._call.cancel(
+                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
+                    self._state.details,
+                )
+                self._state.condition.notify_all()
+
+
+class _SingleThreadedRendezvous(
+    _Rendezvous, grpc.Call, grpc.Future
+):  # pylint: disable=too-many-ancestors
+    """An RPC iterator operating entirely on a single thread.
+
+    The __next__ method of _SingleThreadedRendezvous does not depend on the
+    existence of any other thread, including the "channel spin thread".
+    However, this means that its interface is entirely synchronous. So this
+    class cannot completely fulfill the grpc.Future interface. The result,
+    exception, and traceback methods will never block and will instead raise
+    an exception if calling the method would result in blocking.
+
+    This means that these methods are safe to call from add_done_callback
+    handlers.
+    """
+
+    _state: _RPCState
+
+    def _is_complete(self) -> bool:
+        return self._state.code is not None
+
+    def cancelled(self) -> bool:
+        with self._state.condition:
+            return self._state.cancelled
+
+    def running(self) -> bool:
+        with self._state.condition:
+            return self._state.code is None
+
+    def done(self) -> bool:
+        with self._state.condition:
+            return self._state.code is not None
+
+    def result(self, timeout: Optional[float] = None) -> Any:
+        """Returns the result of the computation or raises its exception.
+
+        This method will never block. Instead, it will raise an exception
+        if calling this method would otherwise result in blocking.
+
+        Since this method will never block, any `timeout` argument passed will
+        be ignored.
+        """
+        del timeout
+        with self._state.condition:
+            if not self._is_complete():
+                raise grpc.experimental.UsageError(
+                    "_SingleThreadedRendezvous only supports result() when the"
+                    " RPC is complete."
+                )
+            if self._state.code is grpc.StatusCode.OK:
+                return self._state.response
+            elif self._state.cancelled:
+                raise grpc.FutureCancelledError()
+            else:
+                raise self
+
+    def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
+        """Return the exception raised by the computation.
+
+        This method will never block. Instead, it will raise an exception
+        if calling this method would otherwise result in blocking.
+
+        Since this method will never block, any `timeout` argument passed will
+        be ignored.
+        """
+        del timeout
+        with self._state.condition:
+            if not self._is_complete():
+                raise grpc.experimental.UsageError(
+                    "_SingleThreadedRendezvous only supports exception() when"
+                    " the RPC is complete."
+                )
+            if self._state.code is grpc.StatusCode.OK:
+                return None
+            elif self._state.cancelled:
+                raise grpc.FutureCancelledError()
+            else:
+                return self
+
+    def traceback(
+        self, timeout: Optional[float] = None
+    ) -> Optional[types.TracebackType]:
+        """Access the traceback of the exception raised by the computation.
+
+        This method will never block. Instead, it will raise an exception
+        if calling this method would otherwise result in blocking.
+
+        Since this method will never block, any `timeout` argument passed will
+        be ignored.
+        """
+        del timeout
+        with self._state.condition:
+            if not self._is_complete():
+                raise grpc.experimental.UsageError(
+                    "_SingleThreadedRendezvous only supports traceback() when"
+                    " the RPC is complete."
+                )
+            if self._state.code is grpc.StatusCode.OK:
+                return None
+            elif self._state.cancelled:
+                raise grpc.FutureCancelledError()
+            else:
+                try:
+                    raise self
+                except grpc.RpcError:
+                    return sys.exc_info()[2]
+
+    def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
+        with self._state.condition:
+            if self._state.code is None:
+                self._state.callbacks.append(functools.partial(fn, self))
+                return
+
+        fn(self)
+
+    def initial_metadata(self) -> Optional[MetadataType]:
+        """See grpc.Call.initial_metadata"""
+        with self._state.condition:
+            # NOTE(gnossen): Based on our initial call batch, we are guaranteed
+            # to receive initial metadata before any messages.
+            while self._state.initial_metadata is None:
+                self._consume_next_event()
+            return self._state.initial_metadata
+
+    def trailing_metadata(self) -> Optional[MetadataType]:
+        """See grpc.Call.trailing_metadata"""
+        with self._state.condition:
+            if self._state.trailing_metadata is None:
+                raise grpc.experimental.UsageError(
+                    "Cannot get trailing metadata until RPC is completed."
+                )
+            return self._state.trailing_metadata
+
+    def code(self) -> Optional[grpc.StatusCode]:
+        """See grpc.Call.code"""
+        with self._state.condition:
+            if self._state.code is None:
+                raise grpc.experimental.UsageError(
+                    "Cannot get code until RPC is completed."
+                )
+            return self._state.code
+
+    def details(self) -> Optional[str]:
+        """See grpc.Call.details"""
+        with self._state.condition:
+            if self._state.details is None:
+                raise grpc.experimental.UsageError(
+                    "Cannot get details until RPC is completed."
+                )
+            return _common.decode(self._state.details)
+
+    def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
+        event = self._call.next_event()
+        with self._state.condition:
+            callbacks = _handle_event(
+                event, self._state, self._response_deserializer
+            )
+            for callback in callbacks:
+                # NOTE(gnossen): We intentionally allow exceptions to bubble up
+                # to the user when running on a single thread.
+                callback()
+        return event
+
+    def _next_response(self) -> Any:
+        while True:
+            self._consume_next_event()
+            with self._state.condition:
+                if self._state.response is not None:
+                    response = self._state.response
+                    self._state.response = None
+                    return response
+                elif (
+                    cygrpc.OperationType.receive_message not in self._state.due
+                ):
+                    if self._state.code is grpc.StatusCode.OK:
+                        raise StopIteration()
+                    elif self._state.code is not None:
+                        raise self
+
+    def _next(self) -> Any:
+        with self._state.condition:
+            if self._state.code is None:
+                # We tentatively add the operation as expected and remove
+                # it if the enqueue operation fails. This allows us to guarantee that
+                # if an event has been submitted to the core completion queue,
+                # it is in `due`. If we waited until after a successful
+                # enqueue operation then a signal could interrupt this
+                # thread between the enqueue operation and the addition of the
+                # operation to `due`. This would cause an exception on the
+                # channel spin thread when the operation completes and no
+                # corresponding operation would be present in state.due.
+                # Note that, since `condition` is held through this block, there is
+                # no data race on `due`.
+                self._state.due.add(cygrpc.OperationType.receive_message)
+                operating = self._call.operate(
+                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
+                )
+                if not operating:
+                    self._state.due.remove(cygrpc.OperationType.receive_message)
+            elif self._state.code is grpc.StatusCode.OK:
+                raise StopIteration()
+            else:
+                raise self
+        return self._next_response()
+
+    def debug_error_string(self) -> Optional[str]:
+        with self._state.condition:
+            if self._state.debug_error_string is None:
+                raise grpc.experimental.UsageError(
+                    "Cannot get debug error string until RPC is completed."
+                )
+            return _common.decode(self._state.debug_error_string)
+
+
+class _MultiThreadedRendezvous(
+    _Rendezvous, grpc.Call, grpc.Future
+):  # pylint: disable=too-many-ancestors
+    """An RPC iterator that depends on a channel spin thread.
+
+    This iterator relies upon a per-channel thread running in the background,
+    dequeueing events from the completion queue, and notifying threads waiting
+    on the threading.Condition object in the _RPCState object.
+
+    This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
+    and to mediate a bidirection streaming RPC.
+    """
+
+    _state: _RPCState
+
+    def initial_metadata(self) -> Optional[MetadataType]:
+        """See grpc.Call.initial_metadata"""
+        with self._state.condition:
+
+            def _done():
+                return self._state.initial_metadata is not None
+
+            _common.wait(self._state.condition.wait, _done)
+            return self._state.initial_metadata
+
+    def trailing_metadata(self) -> Optional[MetadataType]:
+        """See grpc.Call.trailing_metadata"""
+        with self._state.condition:
+
+            def _done():
+                return self._state.trailing_metadata is not None
+
+            _common.wait(self._state.condition.wait, _done)
+            return self._state.trailing_metadata
+
+    def code(self) -> Optional[grpc.StatusCode]:
+        """See grpc.Call.code"""
+        with self._state.condition:
+
+            def _done():
+                return self._state.code is not None
+
+            _common.wait(self._state.condition.wait, _done)
+            return self._state.code
+
+    def details(self) -> Optional[str]:
+        """See grpc.Call.details"""
+        with self._state.condition:
+
+            def _done():
+                return self._state.details is not None
+
+            _common.wait(self._state.condition.wait, _done)
+            return _common.decode(self._state.details)
+
+    def debug_error_string(self) -> Optional[str]:
+        with self._state.condition:
+
+            def _done():
+                return self._state.debug_error_string is not None
+
+            _common.wait(self._state.condition.wait, _done)
+            return _common.decode(self._state.debug_error_string)
+
+    def cancelled(self) -> bool:
+        with self._state.condition:
+            return self._state.cancelled
+
+    def running(self) -> bool:
+        with self._state.condition:
+            return self._state.code is None
+
+    def done(self) -> bool:
+        with self._state.condition:
+            return self._state.code is not None
+
+    def _is_complete(self) -> bool:
+        return self._state.code is not None
+
+    def result(self, timeout: Optional[float] = None) -> Any:
+        """Returns the result of the computation or raises its exception.
+
+        See grpc.Future.result for the full API contract.
+        """
+        with self._state.condition:
+            timed_out = _common.wait(
+                self._state.condition.wait, self._is_complete, timeout=timeout
+            )
+            if timed_out:
+                raise grpc.FutureTimeoutError()
+            else:
+                if self._state.code is grpc.StatusCode.OK:
+                    return self._state.response
+                elif self._state.cancelled:
+                    raise grpc.FutureCancelledError()
+                else:
+                    raise self
+
+    def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
+        """Return the exception raised by the computation.
+
+        See grpc.Future.exception for the full API contract.
+        """
+        with self._state.condition:
+            timed_out = _common.wait(
+                self._state.condition.wait, self._is_complete, timeout=timeout
+            )
+            if timed_out:
+                raise grpc.FutureTimeoutError()
+            else:
+                if self._state.code is grpc.StatusCode.OK:
+                    return None
+                elif self._state.cancelled:
+                    raise grpc.FutureCancelledError()
+                else:
+                    return self
+
+    def traceback(
+        self, timeout: Optional[float] = None
+    ) -> Optional[types.TracebackType]:
+        """Access the traceback of the exception raised by the computation.
+
+        See grpc.future.traceback for the full API contract.
+        """
+        with self._state.condition:
+            timed_out = _common.wait(
+                self._state.condition.wait, self._is_complete, timeout=timeout
+            )
+            if timed_out:
+                raise grpc.FutureTimeoutError()
+            else:
+                if self._state.code is grpc.StatusCode.OK:
+                    return None
+                elif self._state.cancelled:
+                    raise grpc.FutureCancelledError()
+                else:
+                    try:
+                        raise self
+                    except grpc.RpcError:
+                        return sys.exc_info()[2]
+
+    def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
+        with self._state.condition:
+            if self._state.code is None:
+                self._state.callbacks.append(functools.partial(fn, self))
+                return
+
+        fn(self)
+
+    def _next(self) -> Any:
+        with self._state.condition:
+            if self._state.code is None:
+                event_handler = _event_handler(
+                    self._state, self._response_deserializer
+                )
+                self._state.due.add(cygrpc.OperationType.receive_message)
+                operating = self._call.operate(
+                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
+                    event_handler,
+                )
+                if not operating:
+                    self._state.due.remove(cygrpc.OperationType.receive_message)
+            elif self._state.code is grpc.StatusCode.OK:
+                raise StopIteration()
+            else:
+                raise self
+
+            def _response_ready():
+                return self._state.response is not None or (
+                    cygrpc.OperationType.receive_message not in self._state.due
+                    and self._state.code is not None
+                )
+
+            _common.wait(self._state.condition.wait, _response_ready)
+            if self._state.response is not None:
+                response = self._state.response
+                self._state.response = None
+                return response
+            elif cygrpc.OperationType.receive_message not in self._state.due:
+                if self._state.code is grpc.StatusCode.OK:
+                    raise StopIteration()
+                elif self._state.code is not None:
+                    raise self
+
+
+def _start_unary_request(
+    request: Any,
+    timeout: Optional[float],
+    request_serializer: SerializingFunction,
+) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
+    deadline = _deadline(timeout)
+    serialized_request = _common.serialize(request, request_serializer)
+    if serialized_request is None:
+        state = _RPCState(
+            (),
+            (),
+            (),
+            grpc.StatusCode.INTERNAL,
+            "Exception serializing request!",
+        )
+        error = _InactiveRpcError(state)
+        return deadline, None, error
+    else:
+        return deadline, serialized_request, None
+
+
+def _end_unary_response_blocking(
+    state: _RPCState,
+    call: cygrpc.SegregatedCall,
+    with_call: bool,
+    deadline: Optional[float],
+) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
+    if state.code is grpc.StatusCode.OK:
+        if with_call:
+            rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
+            return state.response, rendezvous
+        else:
+            return state.response
+    else:
+        raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
+
+
+def _stream_unary_invocation_operations(
+    metadata: Optional[MetadataType], initial_metadata_flags: int
+) -> Sequence[Sequence[cygrpc.Operation]]:
+    return (
+        (
+            cygrpc.SendInitialMetadataOperation(
+                metadata, initial_metadata_flags
+            ),
+            cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+            cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+        ),
+        (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+    )
+
+
+def _stream_unary_invocation_operations_and_tags(
+    metadata: Optional[MetadataType], initial_metadata_flags: int
+) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
+    return tuple(
+        (
+            operations,
+            None,
+        )
+        for operations in _stream_unary_invocation_operations(
+            metadata, initial_metadata_flags
+        )
+    )
+
+
+def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
+    parent_deadline = cygrpc.get_deadline_from_context()
+    if parent_deadline is None and user_deadline is None:
+        return None
+    elif parent_deadline is not None and user_deadline is None:
+        return parent_deadline
+    elif user_deadline is not None and parent_deadline is None:
+        return user_deadline
+    else:
+        return min(parent_deadline, user_deadline)
+
+
+class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
+    _channel: cygrpc.Channel
+    _managed_call: IntegratedCallFactory
+    _method: bytes
+    _target: bytes
+    _request_serializer: Optional[SerializingFunction]
+    _response_deserializer: Optional[DeserializingFunction]
+    _context: Any
+    _registered_call_handle: Optional[int]
+
+    __slots__ = [
+        "_channel",
+        "_managed_call",
+        "_method",
+        "_target",
+        "_request_serializer",
+        "_response_deserializer",
+        "_context",
+    ]
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        channel: cygrpc.Channel,
+        managed_call: IntegratedCallFactory,
+        method: bytes,
+        target: bytes,
+        request_serializer: Optional[SerializingFunction],
+        response_deserializer: Optional[DeserializingFunction],
+        _registered_call_handle: Optional[int],
+    ):
+        self._channel = channel
+        self._managed_call = managed_call
+        self._method = method
+        self._target = target
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+        self._context = cygrpc.build_census_context()
+        self._registered_call_handle = _registered_call_handle
+
+    def _prepare(
+        self,
+        request: Any,
+        timeout: Optional[float],
+        metadata: Optional[MetadataType],
+        wait_for_ready: Optional[bool],
+        compression: Optional[grpc.Compression],
+    ) -> Tuple[
+        Optional[_RPCState],
+        Optional[Sequence[cygrpc.Operation]],
+        Optional[float],
+        Optional[grpc.RpcError],
+    ]:
+        deadline, serialized_request, rendezvous = _start_unary_request(
+            request, timeout, self._request_serializer
+        )
+        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+            wait_for_ready
+        )
+        augmented_metadata = _compression.augment_metadata(
+            metadata, compression
+        )
+        if serialized_request is None:
+            return None, None, None, rendezvous
+        else:
+            state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
+            operations = (
+                cygrpc.SendInitialMetadataOperation(
+                    augmented_metadata, initial_metadata_flags
+                ),
+                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
+                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+            )
+            return state, operations, deadline, None
+
+    def _blocking(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
+        state, operations, deadline, rendezvous = self._prepare(
+            request, timeout, metadata, wait_for_ready, compression
+        )
+        if state is None:
+            raise rendezvous  # pylint: disable-msg=raising-bad-type
+        else:
+            state.rpc_start_time = time.perf_counter()
+            state.method = _common.decode(self._method)
+            state.target = _common.decode(self._target)
+            call = self._channel.segregated_call(
+                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+                self._method,
+                None,
+                _determine_deadline(deadline),
+                metadata,
+                None if credentials is None else credentials._credentials,
+                (
+                    (
+                        operations,
+                        None,
+                    ),
+                ),
+                self._context,
+                self._registered_call_handle,
+            )
+            event = call.next_event()
+            _handle_event(event, state, self._response_deserializer)
+            return state, call
+
+    def __call__(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Any:
+        (
+            state,
+            call,
+        ) = self._blocking(
+            request, timeout, metadata, credentials, wait_for_ready, compression
+        )
+        return _end_unary_response_blocking(state, call, False, None)
+
+    def with_call(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[Any, grpc.Call]:
+        (
+            state,
+            call,
+        ) = self._blocking(
+            request, timeout, metadata, credentials, wait_for_ready, compression
+        )
+        return _end_unary_response_blocking(state, call, True, None)
+
+    def future(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> _MultiThreadedRendezvous:
+        state, operations, deadline, rendezvous = self._prepare(
+            request, timeout, metadata, wait_for_ready, compression
+        )
+        if state is None:
+            raise rendezvous  # pylint: disable-msg=raising-bad-type
+        else:
+            event_handler = _event_handler(state, self._response_deserializer)
+            state.rpc_start_time = time.perf_counter()
+            state.method = _common.decode(self._method)
+            state.target = _common.decode(self._target)
+            call = self._managed_call(
+                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+                self._method,
+                None,
+                deadline,
+                metadata,
+                None if credentials is None else credentials._credentials,
+                (operations,),
+                event_handler,
+                self._context,
+                self._registered_call_handle,
+            )
+            return _MultiThreadedRendezvous(
+                state, call, self._response_deserializer, deadline
+            )
+
+
+class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
+    _channel: cygrpc.Channel
+    _method: bytes
+    _target: bytes
+    _request_serializer: Optional[SerializingFunction]
+    _response_deserializer: Optional[DeserializingFunction]
+    _context: Any
+    _registered_call_handle: Optional[int]
+
+    __slots__ = [
+        "_channel",
+        "_method",
+        "_target",
+        "_request_serializer",
+        "_response_deserializer",
+        "_context",
+    ]
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        channel: cygrpc.Channel,
+        method: bytes,
+        target: bytes,
+        request_serializer: SerializingFunction,
+        response_deserializer: DeserializingFunction,
+        _registered_call_handle: Optional[int],
+    ):
+        self._channel = channel
+        self._method = method
+        self._target = target
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+        self._context = cygrpc.build_census_context()
+        self._registered_call_handle = _registered_call_handle
+
+    def __call__(  # pylint: disable=too-many-locals
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> _SingleThreadedRendezvous:
+        deadline = _deadline(timeout)
+        serialized_request = _common.serialize(
+            request, self._request_serializer
+        )
+        if serialized_request is None:
+            state = _RPCState(
+                (),
+                (),
+                (),
+                grpc.StatusCode.INTERNAL,
+                "Exception serializing request!",
+            )
+            raise _InactiveRpcError(state)
+
+        state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
+        call_credentials = (
+            None if credentials is None else credentials._credentials
+        )
+        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+            wait_for_ready
+        )
+        augmented_metadata = _compression.augment_metadata(
+            metadata, compression
+        )
+        operations = (
+            (
+                cygrpc.SendInitialMetadataOperation(
+                    augmented_metadata, initial_metadata_flags
+                ),
+                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
+                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+            ),
+            (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
+            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+        )
+        operations_and_tags = tuple((ops, None) for ops in operations)
+        state.rpc_start_time = time.perf_counter()
+        state.method = _common.decode(self._method)
+        state.target = _common.decode(self._target)
+        call = self._channel.segregated_call(
+            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+            self._method,
+            None,
+            _determine_deadline(deadline),
+            metadata,
+            call_credentials,
+            operations_and_tags,
+            self._context,
+            self._registered_call_handle,
+        )
+        return _SingleThreadedRendezvous(
+            state, call, self._response_deserializer, deadline
+        )
+
+
+class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
+    _channel: cygrpc.Channel
+    _managed_call: IntegratedCallFactory
+    _method: bytes
+    _target: bytes
+    _request_serializer: Optional[SerializingFunction]
+    _response_deserializer: Optional[DeserializingFunction]
+    _context: Any
+    _registered_call_handle: Optional[int]
+
+    __slots__ = [
+        "_channel",
+        "_managed_call",
+        "_method",
+        "_target",
+        "_request_serializer",
+        "_response_deserializer",
+        "_context",
+    ]
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        channel: cygrpc.Channel,
+        managed_call: IntegratedCallFactory,
+        method: bytes,
+        target: bytes,
+        request_serializer: SerializingFunction,
+        response_deserializer: DeserializingFunction,
+        _registered_call_handle: Optional[int],
+    ):
+        self._channel = channel
+        self._managed_call = managed_call
+        self._method = method
+        self._target = target
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+        self._context = cygrpc.build_census_context()
+        self._registered_call_handle = _registered_call_handle
+
+    def __call__(  # pylint: disable=too-many-locals
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> _MultiThreadedRendezvous:
+        deadline, serialized_request, rendezvous = _start_unary_request(
+            request, timeout, self._request_serializer
+        )
+        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+            wait_for_ready
+        )
+        if serialized_request is None:
+            raise rendezvous  # pylint: disable-msg=raising-bad-type
+        else:
+            augmented_metadata = _compression.augment_metadata(
+                metadata, compression
+            )
+            state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
+            operations = (
+                (
+                    cygrpc.SendInitialMetadataOperation(
+                        augmented_metadata, initial_metadata_flags
+                    ),
+                    cygrpc.SendMessageOperation(
+                        serialized_request, _EMPTY_FLAGS
+                    ),
+                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+                ),
+                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+            )
+            state.rpc_start_time = time.perf_counter()
+            state.method = _common.decode(self._method)
+            state.target = _common.decode(self._target)
+            call = self._managed_call(
+                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+                self._method,
+                None,
+                _determine_deadline(deadline),
+                metadata,
+                None if credentials is None else credentials._credentials,
+                operations,
+                _event_handler(state, self._response_deserializer),
+                self._context,
+                self._registered_call_handle,
+            )
+            return _MultiThreadedRendezvous(
+                state, call, self._response_deserializer, deadline
+            )
+
+
+class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
+    _channel: cygrpc.Channel
+    _managed_call: IntegratedCallFactory
+    _method: bytes
+    _target: bytes
+    _request_serializer: Optional[SerializingFunction]
+    _response_deserializer: Optional[DeserializingFunction]
+    _context: Any
+    _registered_call_handle: Optional[int]
+
+    __slots__ = [
+        "_channel",
+        "_managed_call",
+        "_method",
+        "_target",
+        "_request_serializer",
+        "_response_deserializer",
+        "_context",
+    ]
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        channel: cygrpc.Channel,
+        managed_call: IntegratedCallFactory,
+        method: bytes,
+        target: bytes,
+        request_serializer: Optional[SerializingFunction],
+        response_deserializer: Optional[DeserializingFunction],
+        _registered_call_handle: Optional[int],
+    ):
+        self._channel = channel
+        self._managed_call = managed_call
+        self._method = method
+        self._target = target
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+        self._context = cygrpc.build_census_context()
+        self._registered_call_handle = _registered_call_handle
+
+    def _blocking(
+        self,
+        request_iterator: Iterator,
+        timeout: Optional[float],
+        metadata: Optional[MetadataType],
+        credentials: Optional[grpc.CallCredentials],
+        wait_for_ready: Optional[bool],
+        compression: Optional[grpc.Compression],
+    ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
+        deadline = _deadline(timeout)
+        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
+        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+            wait_for_ready
+        )
+        augmented_metadata = _compression.augment_metadata(
+            metadata, compression
+        )
+        state.rpc_start_time = time.perf_counter()
+        state.method = _common.decode(self._method)
+        state.target = _common.decode(self._target)
+        call = self._channel.segregated_call(
+            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+            self._method,
+            None,
+            _determine_deadline(deadline),
+            augmented_metadata,
+            None if credentials is None else credentials._credentials,
+            _stream_unary_invocation_operations_and_tags(
+                augmented_metadata, initial_metadata_flags
+            ),
+            self._context,
+            self._registered_call_handle,
+        )
+        _consume_request_iterator(
+            request_iterator, state, call, self._request_serializer, None
+        )
+        while True:
+            event = call.next_event()
+            with state.condition:
+                _handle_event(event, state, self._response_deserializer)
+                state.condition.notify_all()
+                if not state.due:
+                    break
+        return state, call
+
+    def __call__(
+        self,
+        request_iterator: Iterator,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Any:
+        (
+            state,
+            call,
+        ) = self._blocking(
+            request_iterator,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+        return _end_unary_response_blocking(state, call, False, None)
+
+    def with_call(
+        self,
+        request_iterator: Iterator,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[Any, grpc.Call]:
+        (
+            state,
+            call,
+        ) = self._blocking(
+            request_iterator,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+        return _end_unary_response_blocking(state, call, True, None)
+
+    def future(
+        self,
+        request_iterator: Iterator,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> _MultiThreadedRendezvous:
+        deadline = _deadline(timeout)
+        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
+        event_handler = _event_handler(state, self._response_deserializer)
+        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+            wait_for_ready
+        )
+        augmented_metadata = _compression.augment_metadata(
+            metadata, compression
+        )
+        state.rpc_start_time = time.perf_counter()
+        state.method = _common.decode(self._method)
+        state.target = _common.decode(self._target)
+        call = self._managed_call(
+            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+            self._method,
+            None,
+            deadline,
+            augmented_metadata,
+            None if credentials is None else credentials._credentials,
+            _stream_unary_invocation_operations(
+                metadata, initial_metadata_flags
+            ),
+            event_handler,
+            self._context,
+            self._registered_call_handle,
+        )
+        _consume_request_iterator(
+            request_iterator,
+            state,
+            call,
+            self._request_serializer,
+            event_handler,
+        )
+        return _MultiThreadedRendezvous(
+            state, call, self._response_deserializer, deadline
+        )
+
+
+class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
+    _channel: cygrpc.Channel
+    _managed_call: IntegratedCallFactory
+    _method: bytes
+    _target: bytes
+    _request_serializer: Optional[SerializingFunction]
+    _response_deserializer: Optional[DeserializingFunction]
+    _context: Any
+    _registered_call_handle: Optional[int]
+
+    __slots__ = [
+        "_channel",
+        "_managed_call",
+        "_method",
+        "_target",
+        "_request_serializer",
+        "_response_deserializer",
+        "_context",
+    ]
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        channel: cygrpc.Channel,
+        managed_call: IntegratedCallFactory,
+        method: bytes,
+        target: bytes,
+        request_serializer: Optional[SerializingFunction],
+        response_deserializer: Optional[DeserializingFunction],
+        _registered_call_handle: Optional[int],
+    ):
+        self._channel = channel
+        self._managed_call = managed_call
+        self._method = method
+        self._target = target
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+        self._context = cygrpc.build_census_context()
+        self._registered_call_handle = _registered_call_handle
+
+    def __call__(
+        self,
+        request_iterator: Iterator,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> _MultiThreadedRendezvous:
+        deadline = _deadline(timeout)
+        state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
+        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+            wait_for_ready
+        )
+        augmented_metadata = _compression.augment_metadata(
+            metadata, compression
+        )
+        operations = (
+            (
+                cygrpc.SendInitialMetadataOperation(
+                    augmented_metadata, initial_metadata_flags
+                ),
+                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+            ),
+            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+        )
+        event_handler = _event_handler(state, self._response_deserializer)
+        state.rpc_start_time = time.perf_counter()
+        state.method = _common.decode(self._method)
+        state.target = _common.decode(self._target)
+        call = self._managed_call(
+            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
+            self._method,
+            None,
+            _determine_deadline(deadline),
+            augmented_metadata,
+            None if credentials is None else credentials._credentials,
+            operations,
+            event_handler,
+            self._context,
+            self._registered_call_handle,
+        )
+        _consume_request_iterator(
+            request_iterator,
+            state,
+            call,
+            self._request_serializer,
+            event_handler,
+        )
+        return _MultiThreadedRendezvous(
+            state, call, self._response_deserializer, deadline
+        )
+
+
+class _InitialMetadataFlags(int):
+    """Stores immutable initial metadata flags"""
+
+    def __new__(cls, value: int = _EMPTY_FLAGS):
+        value &= cygrpc.InitialMetadataFlags.used_mask
+        return super(_InitialMetadataFlags, cls).__new__(cls, value)
+
+    def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
+        if wait_for_ready is not None:
+            if wait_for_ready:
+                return self.__class__(
+                    self
+                    | cygrpc.InitialMetadataFlags.wait_for_ready
+                    | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
+                )
+            elif not wait_for_ready:
+                return self.__class__(
+                    self & ~cygrpc.InitialMetadataFlags.wait_for_ready
+                    | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
+                )
+        return self
+
+
+class _ChannelCallState(object):
+    channel: cygrpc.Channel
+    managed_calls: int
+    threading: bool
+
+    def __init__(self, channel: cygrpc.Channel):
+        self.lock = threading.Lock()
+        self.channel = channel
+        self.managed_calls = 0
+        self.threading = False
+
+    def reset_postfork_child(self) -> None:
+        self.managed_calls = 0
+
+    def __del__(self):
+        try:
+            self.channel.close(
+                cygrpc.StatusCode.cancelled, "Channel deallocated!"
+            )
+        except (TypeError, AttributeError):
+            pass
+
+
+def _run_channel_spin_thread(state: _ChannelCallState) -> None:
+    def channel_spin():
+        while True:
+            cygrpc.block_if_fork_in_progress(state)
+            event = state.channel.next_call_event()
+            if event.completion_type == cygrpc.CompletionType.queue_timeout:
+                continue
+            call_completed = event.tag(event)
+            if call_completed:
+                with state.lock:
+                    state.managed_calls -= 1
+                    if state.managed_calls == 0:
+                        return
+
+    channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
+    channel_spin_thread.setDaemon(True)
+    channel_spin_thread.start()
+
+
+def _channel_managed_call_management(state: _ChannelCallState):
+    # pylint: disable=too-many-arguments
+    def create(
+        flags: int,
+        method: bytes,
+        host: Optional[str],
+        deadline: Optional[float],
+        metadata: Optional[MetadataType],
+        credentials: Optional[cygrpc.CallCredentials],
+        operations: Sequence[Sequence[cygrpc.Operation]],
+        event_handler: UserTag,
+        context: Any,
+        _registered_call_handle: Optional[int],
+    ) -> cygrpc.IntegratedCall:
+        """Creates a cygrpc.IntegratedCall.
+
+        Args:
+          flags: An integer bitfield of call flags.
+          method: The RPC method.
+          host: A host string for the created call.
+          deadline: A float to be the deadline of the created call or None if
+            the call is to have an infinite deadline.
+          metadata: The metadata for the call or None.
+          credentials: A cygrpc.CallCredentials or None.
+          operations: A sequence of sequences of cygrpc.Operations to be
+            started on the call.
+          event_handler: A behavior to call to handle the events resultant from
+            the operations on the call.
+          context: Context object for distributed tracing.
+          _registered_call_handle: An int representing the call handle of the
+            method, or None if the method is not registered.
+        Returns:
+          A cygrpc.IntegratedCall with which to conduct an RPC.
+        """
+        operations_and_tags = tuple(
+            (
+                operation,
+                event_handler,
+            )
+            for operation in operations
+        )
+        with state.lock:
+            call = state.channel.integrated_call(
+                flags,
+                method,
+                host,
+                deadline,
+                metadata,
+                credentials,
+                operations_and_tags,
+                context,
+                _registered_call_handle,
+            )
+            if state.managed_calls == 0:
+                state.managed_calls = 1
+                _run_channel_spin_thread(state)
+            else:
+                state.managed_calls += 1
+            return call
+
+    return create
+
+
+class _ChannelConnectivityState(object):
+    lock: threading.RLock
+    channel: grpc.Channel
+    polling: bool
+    connectivity: grpc.ChannelConnectivity
+    try_to_connect: bool
+    # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
+    callbacks_and_connectivities: List[
+        Sequence[
+            Union[
+                Callable[[grpc.ChannelConnectivity], None],
+                Optional[grpc.ChannelConnectivity],
+            ]
+        ]
+    ]
+    delivering: bool
+
+    def __init__(self, channel: grpc.Channel):
+        self.lock = threading.RLock()
+        self.channel = channel
+        self.polling = False
+        self.connectivity = None
+        self.try_to_connect = False
+        self.callbacks_and_connectivities = []
+        self.delivering = False
+
+    def reset_postfork_child(self) -> None:
+        self.polling = False
+        self.connectivity = None
+        self.try_to_connect = False
+        self.callbacks_and_connectivities = []
+        self.delivering = False
+
+
+def _deliveries(
+    state: _ChannelConnectivityState,
+) -> List[Callable[[grpc.ChannelConnectivity], None]]:
+    callbacks_needing_update = []
+    for callback_and_connectivity in state.callbacks_and_connectivities:
+        (
+            callback,
+            callback_connectivity,
+        ) = callback_and_connectivity
+        if callback_connectivity is not state.connectivity:
+            callbacks_needing_update.append(callback)
+            callback_and_connectivity[1] = state.connectivity
+    return callbacks_needing_update
+
+
+def _deliver(
+    state: _ChannelConnectivityState,
+    initial_connectivity: grpc.ChannelConnectivity,
+    initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
+) -> None:
+    connectivity = initial_connectivity
+    callbacks = initial_callbacks
+    while True:
+        for callback in callbacks:
+            cygrpc.block_if_fork_in_progress(state)
+            try:
+                callback(connectivity)
+            except Exception:  # pylint: disable=broad-except
+                _LOGGER.exception(
+                    _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
+                )
+        with state.lock:
+            callbacks = _deliveries(state)
+            if callbacks:
+                connectivity = state.connectivity
+            else:
+                state.delivering = False
+                return
+
+
+def _spawn_delivery(
+    state: _ChannelConnectivityState,
+    callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
+) -> None:
+    delivering_thread = cygrpc.ForkManagedThread(
+        target=_deliver,
+        args=(
+            state,
+            state.connectivity,
+            callbacks,
+        ),
+    )
+    delivering_thread.setDaemon(True)
+    delivering_thread.start()
+    state.delivering = True
+
+
+# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
+def _poll_connectivity(
+    state: _ChannelConnectivityState,
+    channel: grpc.Channel,
+    initial_try_to_connect: bool,
+) -> None:
+    try_to_connect = initial_try_to_connect
+    connectivity = channel.check_connectivity_state(try_to_connect)
+    with state.lock:
+        state.connectivity = (
+            _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+                connectivity
+            ]
+        )
+        callbacks = tuple(
+            callback for callback, _ in state.callbacks_and_connectivities
+        )
+        for callback_and_connectivity in state.callbacks_and_connectivities:
+            callback_and_connectivity[1] = state.connectivity
+        if callbacks:
+            _spawn_delivery(state, callbacks)
+    while True:
+        event = channel.watch_connectivity_state(
+            connectivity, time.time() + 0.2
+        )
+        cygrpc.block_if_fork_in_progress(state)
+        with state.lock:
+            if (
+                not state.callbacks_and_connectivities
+                and not state.try_to_connect
+            ):
+                state.polling = False
+                state.connectivity = None
+                break
+            try_to_connect = state.try_to_connect
+            state.try_to_connect = False
+        if event.success or try_to_connect:
+            connectivity = channel.check_connectivity_state(try_to_connect)
+            with state.lock:
+                state.connectivity = (
+                    _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+                        connectivity
+                    ]
+                )
+                if not state.delivering:
+                    callbacks = _deliveries(state)
+                    if callbacks:
+                        _spawn_delivery(state, callbacks)
+
+
+def _subscribe(
+    state: _ChannelConnectivityState,
+    callback: Callable[[grpc.ChannelConnectivity], None],
+    try_to_connect: bool,
+) -> None:
+    with state.lock:
+        if not state.callbacks_and_connectivities and not state.polling:
+            polling_thread = cygrpc.ForkManagedThread(
+                target=_poll_connectivity,
+                args=(state, state.channel, bool(try_to_connect)),
+            )
+            polling_thread.setDaemon(True)
+            polling_thread.start()
+            state.polling = True
+            state.callbacks_and_connectivities.append([callback, None])
+        elif not state.delivering and state.connectivity is not None:
+            _spawn_delivery(state, (callback,))
+            state.try_to_connect |= bool(try_to_connect)
+            state.callbacks_and_connectivities.append(
+                [callback, state.connectivity]
+            )
+        else:
+            state.try_to_connect |= bool(try_to_connect)
+            state.callbacks_and_connectivities.append([callback, None])
+
+
+def _unsubscribe(
+    state: _ChannelConnectivityState,
+    callback: Callable[[grpc.ChannelConnectivity], None],
+) -> None:
+    with state.lock:
+        for index, (subscribed_callback, unused_connectivity) in enumerate(
+            state.callbacks_and_connectivities
+        ):
+            if callback == subscribed_callback:
+                state.callbacks_and_connectivities.pop(index)
+                break
+
+
+def _augment_options(
+    base_options: Sequence[ChannelArgumentType],
+    compression: Optional[grpc.Compression],
+) -> Sequence[ChannelArgumentType]:
+    compression_option = _compression.create_channel_option(compression)
+    return (
+        tuple(base_options)
+        + compression_option
+        + (
+            (
+                cygrpc.ChannelArgKey.primary_user_agent_string,
+                _USER_AGENT,
+            ),
+        )
+    )
+
+
+def _separate_channel_options(
+    options: Sequence[ChannelArgumentType],
+) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
+    """Separates core channel options from Python channel options."""
+    core_options = []
+    python_options = []
+    for pair in options:
+        if (
+            pair[0]
+            == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
+        ):
+            python_options.append(pair)
+        else:
+            core_options.append(pair)
+    return python_options, core_options
+
+
+class Channel(grpc.Channel):
+    """A cygrpc.Channel-backed implementation of grpc.Channel."""
+
+    _single_threaded_unary_stream: bool
+    _channel: cygrpc.Channel
+    _call_state: _ChannelCallState
+    _connectivity_state: _ChannelConnectivityState
+    _target: str
+    _registered_call_handles: Dict[str, int]
+
+    def __init__(
+        self,
+        target: str,
+        options: Sequence[ChannelArgumentType],
+        credentials: Optional[grpc.ChannelCredentials],
+        compression: Optional[grpc.Compression],
+    ):
+        """Constructor.
+
+        Args:
+          target: The target to which to connect.
+          options: Configuration options for the channel.
+          credentials: A cygrpc.ChannelCredentials or None.
+          compression: An optional value indicating the compression method to be
+            used over the lifetime of the channel.
+        """
+        python_options, core_options = _separate_channel_options(options)
+        self._single_threaded_unary_stream = (
+            _DEFAULT_SINGLE_THREADED_UNARY_STREAM
+        )
+        self._process_python_options(python_options)
+        self._channel = cygrpc.Channel(
+            _common.encode(target),
+            _augment_options(core_options, compression),
+            credentials,
+        )
+        self._target = target
+        self._call_state = _ChannelCallState(self._channel)
+        self._connectivity_state = _ChannelConnectivityState(self._channel)
+        cygrpc.fork_register_channel(self)
+        if cygrpc.g_gevent_activated:
+            cygrpc.gevent_increment_channel_count()
+
+    def _get_registered_call_handle(self, method: str) -> int:
+        """
+        Get the registered call handle for a method.
+
+        This is a semi-private method. It is intended for use only by gRPC generated code.
+
+        This method is not thread-safe.
+
+        Args:
+          method: Required, the method name for the RPC.
+
+        Returns:
+          The registered call handle pointer in the form of a Python Long.
+        """
+        return self._channel.get_registered_call_handle(_common.encode(method))
+
+    def _process_python_options(
+        self, python_options: Sequence[ChannelArgumentType]
+    ) -> None:
+        """Sets channel attributes according to python-only channel options."""
+        for pair in python_options:
+            if (
+                pair[0]
+                == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
+            ):
+                self._single_threaded_unary_stream = True
+
+    def subscribe(
+        self,
+        callback: Callable[[grpc.ChannelConnectivity], None],
+        try_to_connect: Optional[bool] = None,
+    ) -> None:
+        _subscribe(self._connectivity_state, callback, try_to_connect)
+
+    def unsubscribe(
+        self, callback: Callable[[grpc.ChannelConnectivity], None]
+    ) -> None:
+        _unsubscribe(self._connectivity_state, callback)
+
+    # pylint: disable=arguments-differ
+    def unary_unary(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.UnaryUnaryMultiCallable:
+        _registered_call_handle = None
+        if _registered_method:
+            _registered_call_handle = self._get_registered_call_handle(method)
+        return _UnaryUnaryMultiCallable(
+            self._channel,
+            _channel_managed_call_management(self._call_state),
+            _common.encode(method),
+            _common.encode(self._target),
+            request_serializer,
+            response_deserializer,
+            _registered_call_handle,
+        )
+
+    # pylint: disable=arguments-differ
+    def unary_stream(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.UnaryStreamMultiCallable:
+        _registered_call_handle = None
+        if _registered_method:
+            _registered_call_handle = self._get_registered_call_handle(method)
+        # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
+        # on a single Python thread results in an appreciable speed-up. However,
+        # due to slight differences in capability, the multi-threaded variant
+        # remains the default.
+        if self._single_threaded_unary_stream:
+            return _SingleThreadedUnaryStreamMultiCallable(
+                self._channel,
+                _common.encode(method),
+                _common.encode(self._target),
+                request_serializer,
+                response_deserializer,
+                _registered_call_handle,
+            )
+        else:
+            return _UnaryStreamMultiCallable(
+                self._channel,
+                _channel_managed_call_management(self._call_state),
+                _common.encode(method),
+                _common.encode(self._target),
+                request_serializer,
+                response_deserializer,
+                _registered_call_handle,
+            )
+
+    # pylint: disable=arguments-differ
+    def stream_unary(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.StreamUnaryMultiCallable:
+        _registered_call_handle = None
+        if _registered_method:
+            _registered_call_handle = self._get_registered_call_handle(method)
+        return _StreamUnaryMultiCallable(
+            self._channel,
+            _channel_managed_call_management(self._call_state),
+            _common.encode(method),
+            _common.encode(self._target),
+            request_serializer,
+            response_deserializer,
+            _registered_call_handle,
+        )
+
+    # pylint: disable=arguments-differ
+    def stream_stream(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.StreamStreamMultiCallable:
+        _registered_call_handle = None
+        if _registered_method:
+            _registered_call_handle = self._get_registered_call_handle(method)
+        return _StreamStreamMultiCallable(
+            self._channel,
+            _channel_managed_call_management(self._call_state),
+            _common.encode(method),
+            _common.encode(self._target),
+            request_serializer,
+            response_deserializer,
+            _registered_call_handle,
+        )
+
+    def _unsubscribe_all(self) -> None:
+        state = self._connectivity_state
+        if state:
+            with state.lock:
+                del state.callbacks_and_connectivities[:]
+
+    def _close(self) -> None:
+        self._unsubscribe_all()
+        self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
+        cygrpc.fork_unregister_channel(self)
+        if cygrpc.g_gevent_activated:
+            cygrpc.gevent_decrement_channel_count()
+
+    def _close_on_fork(self) -> None:
+        self._unsubscribe_all()
+        self._channel.close_on_fork(
+            cygrpc.StatusCode.cancelled, "Channel closed due to fork"
+        )
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self._close()
+        return False
+
+    def close(self) -> None:
+        self._close()
+
+    def __del__(self):
+        # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
+        # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
+        # here (or more likely, call self._close() here). We don't do this today
+        # because many valid use cases today allow the channel to be deleted
+        # immediately after stubs are created. After a sufficient period of time
+        # has passed for all users to be trusted to freeze out to their channels
+        # for as long as they are in use and to close them after using them,
+        # then deletion of this grpc._channel.Channel instance can be made to
+        # effect closure of the underlying cygrpc.Channel instance.
+        try:
+            self._unsubscribe_all()
+        except:  # pylint: disable=bare-except
+            # Exceptions in __del__ are ignored by Python anyway, but they can
+            # keep spamming logs.  Just silence them.
+            pass