about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/grpc/_interceptor.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/grpc/_interceptor.py')
-rw-r--r--.venv/lib/python3.12/site-packages/grpc/_interceptor.py813
1 files changed, 813 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/grpc/_interceptor.py b/.venv/lib/python3.12/site-packages/grpc/_interceptor.py
new file mode 100644
index 00000000..94abafeb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/grpc/_interceptor.py
@@ -0,0 +1,813 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Implementation of gRPC Python interceptors."""
+
+import collections
+import sys
+import types
+from typing import Any, Callable, Optional, Sequence, Tuple, Union
+
+import grpc
+
+from ._typing import DeserializingFunction
+from ._typing import DoneCallbackType
+from ._typing import MetadataType
+from ._typing import RequestIterableType
+from ._typing import SerializingFunction
+
+
+class _ServicePipeline(object):
+    interceptors: Tuple[grpc.ServerInterceptor]
+
+    def __init__(self, interceptors: Sequence[grpc.ServerInterceptor]):
+        self.interceptors = tuple(interceptors)
+
+    def _continuation(self, thunk: Callable, index: int) -> Callable:
+        return lambda context: self._intercept_at(thunk, index, context)
+
+    def _intercept_at(
+        self, thunk: Callable, index: int, context: grpc.HandlerCallDetails
+    ) -> grpc.RpcMethodHandler:
+        if index < len(self.interceptors):
+            interceptor = self.interceptors[index]
+            thunk = self._continuation(thunk, index + 1)
+            return interceptor.intercept_service(thunk, context)
+        else:
+            return thunk(context)
+
+    def execute(
+        self, thunk: Callable, context: grpc.HandlerCallDetails
+    ) -> grpc.RpcMethodHandler:
+        return self._intercept_at(thunk, 0, context)
+
+
+def service_pipeline(
+    interceptors: Optional[Sequence[grpc.ServerInterceptor]],
+) -> Optional[_ServicePipeline]:
+    return _ServicePipeline(interceptors) if interceptors else None
+
+
+class _ClientCallDetails(
+    collections.namedtuple(
+        "_ClientCallDetails",
+        (
+            "method",
+            "timeout",
+            "metadata",
+            "credentials",
+            "wait_for_ready",
+            "compression",
+        ),
+    ),
+    grpc.ClientCallDetails,
+):
+    pass
+
+
+def _unwrap_client_call_details(
+    call_details: grpc.ClientCallDetails,
+    default_details: grpc.ClientCallDetails,
+) -> Tuple[
+    str, float, MetadataType, grpc.CallCredentials, bool, grpc.Compression
+]:
+    try:
+        method = call_details.method  # pytype: disable=attribute-error
+    except AttributeError:
+        method = default_details.method  # pytype: disable=attribute-error
+
+    try:
+        timeout = call_details.timeout  # pytype: disable=attribute-error
+    except AttributeError:
+        timeout = default_details.timeout  # pytype: disable=attribute-error
+
+    try:
+        metadata = call_details.metadata  # pytype: disable=attribute-error
+    except AttributeError:
+        metadata = default_details.metadata  # pytype: disable=attribute-error
+
+    try:
+        credentials = (
+            call_details.credentials
+        )  # pytype: disable=attribute-error
+    except AttributeError:
+        credentials = (
+            default_details.credentials
+        )  # pytype: disable=attribute-error
+
+    try:
+        wait_for_ready = (
+            call_details.wait_for_ready
+        )  # pytype: disable=attribute-error
+    except AttributeError:
+        wait_for_ready = (
+            default_details.wait_for_ready
+        )  # pytype: disable=attribute-error
+
+    try:
+        compression = (
+            call_details.compression
+        )  # pytype: disable=attribute-error
+    except AttributeError:
+        compression = (
+            default_details.compression
+        )  # pytype: disable=attribute-error
+
+    return method, timeout, metadata, credentials, wait_for_ready, compression
+
+
+class _FailureOutcome(
+    grpc.RpcError, grpc.Future, grpc.Call
+):  # pylint: disable=too-many-ancestors
+    _exception: Exception
+    _traceback: types.TracebackType
+
+    def __init__(self, exception: Exception, traceback: types.TracebackType):
+        super(_FailureOutcome, self).__init__()
+        self._exception = exception
+        self._traceback = traceback
+
+    def initial_metadata(self) -> Optional[MetadataType]:
+        return None
+
+    def trailing_metadata(self) -> Optional[MetadataType]:
+        return None
+
+    def code(self) -> Optional[grpc.StatusCode]:
+        return grpc.StatusCode.INTERNAL
+
+    def details(self) -> Optional[str]:
+        return "Exception raised while intercepting the RPC"
+
+    def cancel(self) -> bool:
+        return False
+
+    def cancelled(self) -> bool:
+        return False
+
+    def is_active(self) -> bool:
+        return False
+
+    def time_remaining(self) -> Optional[float]:
+        return None
+
+    def running(self) -> bool:
+        return False
+
+    def done(self) -> bool:
+        return True
+
+    def result(self, ignored_timeout: Optional[float] = None):
+        raise self._exception
+
+    def exception(
+        self, ignored_timeout: Optional[float] = None
+    ) -> Optional[Exception]:
+        return self._exception
+
+    def traceback(
+        self, ignored_timeout: Optional[float] = None
+    ) -> Optional[types.TracebackType]:
+        return self._traceback
+
+    def add_callback(self, unused_callback) -> bool:
+        return False
+
+    def add_done_callback(self, fn: DoneCallbackType) -> None:
+        fn(self)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        raise self._exception
+
+    def next(self):
+        return self.__next__()
+
+
+class _UnaryOutcome(grpc.Call, grpc.Future):
+    _response: Any
+    _call: grpc.Call
+
+    def __init__(self, response: Any, call: grpc.Call):
+        self._response = response
+        self._call = call
+
+    def initial_metadata(self) -> Optional[MetadataType]:
+        return self._call.initial_metadata()
+
+    def trailing_metadata(self) -> Optional[MetadataType]:
+        return self._call.trailing_metadata()
+
+    def code(self) -> Optional[grpc.StatusCode]:
+        return self._call.code()
+
+    def details(self) -> Optional[str]:
+        return self._call.details()
+
+    def is_active(self) -> bool:
+        return self._call.is_active()
+
+    def time_remaining(self) -> Optional[float]:
+        return self._call.time_remaining()
+
+    def cancel(self) -> bool:
+        return self._call.cancel()
+
+    def add_callback(self, callback) -> bool:
+        return self._call.add_callback(callback)
+
+    def cancelled(self) -> bool:
+        return False
+
+    def running(self) -> bool:
+        return False
+
+    def done(self) -> bool:
+        return True
+
+    def result(self, ignored_timeout: Optional[float] = None):
+        return self._response
+
+    def exception(self, ignored_timeout: Optional[float] = None):
+        return None
+
+    def traceback(self, ignored_timeout: Optional[float] = None):
+        return None
+
+    def add_done_callback(self, fn: DoneCallbackType) -> None:
+        fn(self)
+
+
+class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
+    _thunk: Callable
+    _method: str
+    _interceptor: grpc.UnaryUnaryClientInterceptor
+
+    def __init__(
+        self,
+        thunk: Callable,
+        method: str,
+        interceptor: grpc.UnaryUnaryClientInterceptor,
+    ):
+        self._thunk = thunk
+        self._method = method
+        self._interceptor = interceptor
+
+    def __call__(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Any:
+        response, ignored_call = self._with_call(
+            request,
+            timeout=timeout,
+            metadata=metadata,
+            credentials=credentials,
+            wait_for_ready=wait_for_ready,
+            compression=compression,
+        )
+        return response
+
+    def _with_call(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[Any, grpc.Call]:
+        client_call_details = _ClientCallDetails(
+            self._method,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+
+        def continuation(new_details, request):
+            (
+                new_method,
+                new_timeout,
+                new_metadata,
+                new_credentials,
+                new_wait_for_ready,
+                new_compression,
+            ) = _unwrap_client_call_details(new_details, client_call_details)
+            try:
+                response, call = self._thunk(new_method).with_call(
+                    request,
+                    timeout=new_timeout,
+                    metadata=new_metadata,
+                    credentials=new_credentials,
+                    wait_for_ready=new_wait_for_ready,
+                    compression=new_compression,
+                )
+                return _UnaryOutcome(response, call)
+            except grpc.RpcError as rpc_error:
+                return rpc_error
+            except Exception as exception:  # pylint:disable=broad-except
+                return _FailureOutcome(exception, sys.exc_info()[2])
+
+        call = self._interceptor.intercept_unary_unary(
+            continuation, client_call_details, request
+        )
+        return call.result(), call
+
+    def with_call(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[Any, grpc.Call]:
+        return self._with_call(
+            request,
+            timeout=timeout,
+            metadata=metadata,
+            credentials=credentials,
+            wait_for_ready=wait_for_ready,
+            compression=compression,
+        )
+
+    def future(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Any:
+        client_call_details = _ClientCallDetails(
+            self._method,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+
+        def continuation(new_details, request):
+            (
+                new_method,
+                new_timeout,
+                new_metadata,
+                new_credentials,
+                new_wait_for_ready,
+                new_compression,
+            ) = _unwrap_client_call_details(new_details, client_call_details)
+            return self._thunk(new_method).future(
+                request,
+                timeout=new_timeout,
+                metadata=new_metadata,
+                credentials=new_credentials,
+                wait_for_ready=new_wait_for_ready,
+                compression=new_compression,
+            )
+
+        try:
+            return self._interceptor.intercept_unary_unary(
+                continuation, client_call_details, request
+            )
+        except Exception as exception:  # pylint:disable=broad-except
+            return _FailureOutcome(exception, sys.exc_info()[2])
+
+
+class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
+    _thunk: Callable
+    _method: str
+    _interceptor: grpc.UnaryStreamClientInterceptor
+
+    def __init__(
+        self,
+        thunk: Callable,
+        method: str,
+        interceptor: grpc.UnaryStreamClientInterceptor,
+    ):
+        self._thunk = thunk
+        self._method = method
+        self._interceptor = interceptor
+
+    def __call__(
+        self,
+        request: Any,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ):
+        client_call_details = _ClientCallDetails(
+            self._method,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+
+        def continuation(new_details, request):
+            (
+                new_method,
+                new_timeout,
+                new_metadata,
+                new_credentials,
+                new_wait_for_ready,
+                new_compression,
+            ) = _unwrap_client_call_details(new_details, client_call_details)
+            return self._thunk(new_method)(
+                request,
+                timeout=new_timeout,
+                metadata=new_metadata,
+                credentials=new_credentials,
+                wait_for_ready=new_wait_for_ready,
+                compression=new_compression,
+            )
+
+        try:
+            return self._interceptor.intercept_unary_stream(
+                continuation, client_call_details, request
+            )
+        except Exception as exception:  # pylint:disable=broad-except
+            return _FailureOutcome(exception, sys.exc_info()[2])
+
+
+class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
+    _thunk: Callable
+    _method: str
+    _interceptor: grpc.StreamUnaryClientInterceptor
+
+    def __init__(
+        self,
+        thunk: Callable,
+        method: str,
+        interceptor: grpc.StreamUnaryClientInterceptor,
+    ):
+        self._thunk = thunk
+        self._method = method
+        self._interceptor = interceptor
+
+    def __call__(
+        self,
+        request_iterator: RequestIterableType,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Any:
+        response, ignored_call = self._with_call(
+            request_iterator,
+            timeout=timeout,
+            metadata=metadata,
+            credentials=credentials,
+            wait_for_ready=wait_for_ready,
+            compression=compression,
+        )
+        return response
+
+    def _with_call(
+        self,
+        request_iterator: RequestIterableType,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[Any, grpc.Call]:
+        client_call_details = _ClientCallDetails(
+            self._method,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+
+        def continuation(new_details, request_iterator):
+            (
+                new_method,
+                new_timeout,
+                new_metadata,
+                new_credentials,
+                new_wait_for_ready,
+                new_compression,
+            ) = _unwrap_client_call_details(new_details, client_call_details)
+            try:
+                response, call = self._thunk(new_method).with_call(
+                    request_iterator,
+                    timeout=new_timeout,
+                    metadata=new_metadata,
+                    credentials=new_credentials,
+                    wait_for_ready=new_wait_for_ready,
+                    compression=new_compression,
+                )
+                return _UnaryOutcome(response, call)
+            except grpc.RpcError as rpc_error:
+                return rpc_error
+            except Exception as exception:  # pylint:disable=broad-except
+                return _FailureOutcome(exception, sys.exc_info()[2])
+
+        call = self._interceptor.intercept_stream_unary(
+            continuation, client_call_details, request_iterator
+        )
+        return call.result(), call
+
+    def with_call(
+        self,
+        request_iterator: RequestIterableType,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Tuple[Any, grpc.Call]:
+        return self._with_call(
+            request_iterator,
+            timeout=timeout,
+            metadata=metadata,
+            credentials=credentials,
+            wait_for_ready=wait_for_ready,
+            compression=compression,
+        )
+
+    def future(
+        self,
+        request_iterator: RequestIterableType,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ) -> Any:
+        client_call_details = _ClientCallDetails(
+            self._method,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+
+        def continuation(new_details, request_iterator):
+            (
+                new_method,
+                new_timeout,
+                new_metadata,
+                new_credentials,
+                new_wait_for_ready,
+                new_compression,
+            ) = _unwrap_client_call_details(new_details, client_call_details)
+            return self._thunk(new_method).future(
+                request_iterator,
+                timeout=new_timeout,
+                metadata=new_metadata,
+                credentials=new_credentials,
+                wait_for_ready=new_wait_for_ready,
+                compression=new_compression,
+            )
+
+        try:
+            return self._interceptor.intercept_stream_unary(
+                continuation, client_call_details, request_iterator
+            )
+        except Exception as exception:  # pylint:disable=broad-except
+            return _FailureOutcome(exception, sys.exc_info()[2])
+
+
+class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
+    _thunk: Callable
+    _method: str
+    _interceptor: grpc.StreamStreamClientInterceptor
+
+    def __init__(
+        self,
+        thunk: Callable,
+        method: str,
+        interceptor: grpc.StreamStreamClientInterceptor,
+    ):
+        self._thunk = thunk
+        self._method = method
+        self._interceptor = interceptor
+
+    def __call__(
+        self,
+        request_iterator: RequestIterableType,
+        timeout: Optional[float] = None,
+        metadata: Optional[MetadataType] = None,
+        credentials: Optional[grpc.CallCredentials] = None,
+        wait_for_ready: Optional[bool] = None,
+        compression: Optional[grpc.Compression] = None,
+    ):
+        client_call_details = _ClientCallDetails(
+            self._method,
+            timeout,
+            metadata,
+            credentials,
+            wait_for_ready,
+            compression,
+        )
+
+        def continuation(new_details, request_iterator):
+            (
+                new_method,
+                new_timeout,
+                new_metadata,
+                new_credentials,
+                new_wait_for_ready,
+                new_compression,
+            ) = _unwrap_client_call_details(new_details, client_call_details)
+            return self._thunk(new_method)(
+                request_iterator,
+                timeout=new_timeout,
+                metadata=new_metadata,
+                credentials=new_credentials,
+                wait_for_ready=new_wait_for_ready,
+                compression=new_compression,
+            )
+
+        try:
+            return self._interceptor.intercept_stream_stream(
+                continuation, client_call_details, request_iterator
+            )
+        except Exception as exception:  # pylint:disable=broad-except
+            return _FailureOutcome(exception, sys.exc_info()[2])
+
+
+class _Channel(grpc.Channel):
+    _channel: grpc.Channel
+    _interceptor: Union[
+        grpc.UnaryUnaryClientInterceptor,
+        grpc.UnaryStreamClientInterceptor,
+        grpc.StreamStreamClientInterceptor,
+        grpc.StreamUnaryClientInterceptor,
+    ]
+
+    def __init__(
+        self,
+        channel: grpc.Channel,
+        interceptor: Union[
+            grpc.UnaryUnaryClientInterceptor,
+            grpc.UnaryStreamClientInterceptor,
+            grpc.StreamStreamClientInterceptor,
+            grpc.StreamUnaryClientInterceptor,
+        ],
+    ):
+        self._channel = channel
+        self._interceptor = interceptor
+
+    def subscribe(
+        self, callback: Callable, try_to_connect: Optional[bool] = False
+    ):
+        self._channel.subscribe(callback, try_to_connect=try_to_connect)
+
+    def unsubscribe(self, callback: Callable):
+        self._channel.unsubscribe(callback)
+
+    # pylint: disable=arguments-differ
+    def unary_unary(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.UnaryUnaryMultiCallable:
+        # pytype: disable=wrong-arg-count
+        thunk = lambda m: self._channel.unary_unary(
+            m,
+            request_serializer,
+            response_deserializer,
+            _registered_method,
+        )
+        # pytype: enable=wrong-arg-count
+        if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor):
+            return _UnaryUnaryMultiCallable(thunk, method, self._interceptor)
+        else:
+            return thunk(method)
+
+    # pylint: disable=arguments-differ
+    def unary_stream(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.UnaryStreamMultiCallable:
+        # pytype: disable=wrong-arg-count
+        thunk = lambda m: self._channel.unary_stream(
+            m,
+            request_serializer,
+            response_deserializer,
+            _registered_method,
+        )
+        # pytype: enable=wrong-arg-count
+        if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor):
+            return _UnaryStreamMultiCallable(thunk, method, self._interceptor)
+        else:
+            return thunk(method)
+
+    # pylint: disable=arguments-differ
+    def stream_unary(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.StreamUnaryMultiCallable:
+        # pytype: disable=wrong-arg-count
+        thunk = lambda m: self._channel.stream_unary(
+            m,
+            request_serializer,
+            response_deserializer,
+            _registered_method,
+        )
+        # pytype: enable=wrong-arg-count
+        if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor):
+            return _StreamUnaryMultiCallable(thunk, method, self._interceptor)
+        else:
+            return thunk(method)
+
+    # pylint: disable=arguments-differ
+    def stream_stream(
+        self,
+        method: str,
+        request_serializer: Optional[SerializingFunction] = None,
+        response_deserializer: Optional[DeserializingFunction] = None,
+        _registered_method: Optional[bool] = False,
+    ) -> grpc.StreamStreamMultiCallable:
+        # pytype: disable=wrong-arg-count
+        thunk = lambda m: self._channel.stream_stream(
+            m,
+            request_serializer,
+            response_deserializer,
+            _registered_method,
+        )
+        # pytype: enable=wrong-arg-count
+        if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor):
+            return _StreamStreamMultiCallable(thunk, method, self._interceptor)
+        else:
+            return thunk(method)
+
+    def _close(self):
+        self._channel.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self._close()
+        return False
+
+    def close(self):
+        self._channel.close()
+
+
+def intercept_channel(
+    channel: grpc.Channel,
+    *interceptors: Optional[
+        Sequence[
+            Union[
+                grpc.UnaryUnaryClientInterceptor,
+                grpc.UnaryStreamClientInterceptor,
+                grpc.StreamStreamClientInterceptor,
+                grpc.StreamUnaryClientInterceptor,
+            ]
+        ]
+    ],
+) -> grpc.Channel:
+    for interceptor in reversed(list(interceptors)):
+        if (
+            not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor)
+            and not isinstance(interceptor, grpc.UnaryStreamClientInterceptor)
+            and not isinstance(interceptor, grpc.StreamUnaryClientInterceptor)
+            and not isinstance(interceptor, grpc.StreamStreamClientInterceptor)
+        ):
+            raise TypeError(
+                "interceptor must be "
+                "grpc.UnaryUnaryClientInterceptor or "
+                "grpc.UnaryStreamClientInterceptor or "
+                "grpc.StreamUnaryClientInterceptor or "
+                "grpc.StreamStreamClientInterceptor or "
+            )
+        channel = _Channel(channel, interceptor)
+    return channel