diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/grpc/_interceptor.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/grpc/_interceptor.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/grpc/_interceptor.py | 813 |
1 files changed, 813 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/grpc/_interceptor.py b/.venv/lib/python3.12/site-packages/grpc/_interceptor.py new file mode 100644 index 00000000..94abafeb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/grpc/_interceptor.py @@ -0,0 +1,813 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of gRPC Python interceptors.""" + +import collections +import sys +import types +from typing import Any, Callable, Optional, Sequence, Tuple, Union + +import grpc + +from ._typing import DeserializingFunction +from ._typing import DoneCallbackType +from ._typing import MetadataType +from ._typing import RequestIterableType +from ._typing import SerializingFunction + + +class _ServicePipeline(object): + interceptors: Tuple[grpc.ServerInterceptor] + + def __init__(self, interceptors: Sequence[grpc.ServerInterceptor]): + self.interceptors = tuple(interceptors) + + def _continuation(self, thunk: Callable, index: int) -> Callable: + return lambda context: self._intercept_at(thunk, index, context) + + def _intercept_at( + self, thunk: Callable, index: int, context: grpc.HandlerCallDetails + ) -> grpc.RpcMethodHandler: + if index < len(self.interceptors): + interceptor = self.interceptors[index] + thunk = self._continuation(thunk, index + 1) + return interceptor.intercept_service(thunk, context) + else: + return thunk(context) + + def execute( + self, thunk: Callable, context: grpc.HandlerCallDetails + ) -> grpc.RpcMethodHandler: + return self._intercept_at(thunk, 0, context) + + +def service_pipeline( + interceptors: Optional[Sequence[grpc.ServerInterceptor]], +) -> Optional[_ServicePipeline]: + return _ServicePipeline(interceptors) if interceptors else None + + +class _ClientCallDetails( + collections.namedtuple( + "_ClientCallDetails", + ( + "method", + "timeout", + "metadata", + "credentials", + "wait_for_ready", + "compression", + ), + ), + grpc.ClientCallDetails, +): + pass + + +def _unwrap_client_call_details( + call_details: grpc.ClientCallDetails, + default_details: grpc.ClientCallDetails, +) -> Tuple[ + str, float, MetadataType, grpc.CallCredentials, bool, grpc.Compression +]: + try: + method = call_details.method # pytype: disable=attribute-error + except AttributeError: + method = default_details.method # pytype: disable=attribute-error + + try: + timeout = call_details.timeout # pytype: disable=attribute-error + except AttributeError: + timeout = default_details.timeout # pytype: disable=attribute-error + + try: + metadata = call_details.metadata # pytype: disable=attribute-error + except AttributeError: + metadata = default_details.metadata # pytype: disable=attribute-error + + try: + credentials = ( + call_details.credentials + ) # pytype: disable=attribute-error + except AttributeError: + credentials = ( + default_details.credentials + ) # pytype: disable=attribute-error + + try: + wait_for_ready = ( + call_details.wait_for_ready + ) # pytype: disable=attribute-error + except AttributeError: + wait_for_ready = ( + default_details.wait_for_ready + ) # pytype: disable=attribute-error + + try: + compression = ( + call_details.compression + ) # pytype: disable=attribute-error + except AttributeError: + compression = ( + default_details.compression + ) # pytype: disable=attribute-error + + return method, timeout, metadata, credentials, wait_for_ready, compression + + +class _FailureOutcome( + grpc.RpcError, grpc.Future, grpc.Call +): # pylint: disable=too-many-ancestors + _exception: Exception + _traceback: types.TracebackType + + def __init__(self, exception: Exception, traceback: types.TracebackType): + super(_FailureOutcome, self).__init__() + self._exception = exception + self._traceback = traceback + + def initial_metadata(self) -> Optional[MetadataType]: + return None + + def trailing_metadata(self) -> Optional[MetadataType]: + return None + + def code(self) -> Optional[grpc.StatusCode]: + return grpc.StatusCode.INTERNAL + + def details(self) -> Optional[str]: + return "Exception raised while intercepting the RPC" + + def cancel(self) -> bool: + return False + + def cancelled(self) -> bool: + return False + + def is_active(self) -> bool: + return False + + def time_remaining(self) -> Optional[float]: + return None + + def running(self) -> bool: + return False + + def done(self) -> bool: + return True + + def result(self, ignored_timeout: Optional[float] = None): + raise self._exception + + def exception( + self, ignored_timeout: Optional[float] = None + ) -> Optional[Exception]: + return self._exception + + def traceback( + self, ignored_timeout: Optional[float] = None + ) -> Optional[types.TracebackType]: + return self._traceback + + def add_callback(self, unused_callback) -> bool: + return False + + def add_done_callback(self, fn: DoneCallbackType) -> None: + fn(self) + + def __iter__(self): + return self + + def __next__(self): + raise self._exception + + def next(self): + return self.__next__() + + +class _UnaryOutcome(grpc.Call, grpc.Future): + _response: Any + _call: grpc.Call + + def __init__(self, response: Any, call: grpc.Call): + self._response = response + self._call = call + + def initial_metadata(self) -> Optional[MetadataType]: + return self._call.initial_metadata() + + def trailing_metadata(self) -> Optional[MetadataType]: + return self._call.trailing_metadata() + + def code(self) -> Optional[grpc.StatusCode]: + return self._call.code() + + def details(self) -> Optional[str]: + return self._call.details() + + def is_active(self) -> bool: + return self._call.is_active() + + def time_remaining(self) -> Optional[float]: + return self._call.time_remaining() + + def cancel(self) -> bool: + return self._call.cancel() + + def add_callback(self, callback) -> bool: + return self._call.add_callback(callback) + + def cancelled(self) -> bool: + return False + + def running(self) -> bool: + return False + + def done(self) -> bool: + return True + + def result(self, ignored_timeout: Optional[float] = None): + return self._response + + def exception(self, ignored_timeout: Optional[float] = None): + return None + + def traceback(self, ignored_timeout: Optional[float] = None): + return None + + def add_done_callback(self, fn: DoneCallbackType) -> None: + fn(self) + + +class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + _thunk: Callable + _method: str + _interceptor: grpc.UnaryUnaryClientInterceptor + + def __init__( + self, + thunk: Callable, + method: str, + interceptor: grpc.UnaryUnaryClientInterceptor, + ): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__( + self, + request: Any, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Any: + response, ignored_call = self._with_call( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials, + wait_for_ready=wait_for_ready, + compression=compression, + ) + return response + + def _with_call( + self, + request: Any, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Tuple[Any, grpc.Call]: + client_call_details = _ClientCallDetails( + self._method, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) + + def continuation(new_details, request): + ( + new_method, + new_timeout, + new_metadata, + new_credentials, + new_wait_for_ready, + new_compression, + ) = _unwrap_client_call_details(new_details, client_call_details) + try: + response, call = self._thunk(new_method).with_call( + request, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials, + wait_for_ready=new_wait_for_ready, + compression=new_compression, + ) + return _UnaryOutcome(response, call) + except grpc.RpcError as rpc_error: + return rpc_error + except Exception as exception: # pylint:disable=broad-except + return _FailureOutcome(exception, sys.exc_info()[2]) + + call = self._interceptor.intercept_unary_unary( + continuation, client_call_details, request + ) + return call.result(), call + + def with_call( + self, + request: Any, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Tuple[Any, grpc.Call]: + return self._with_call( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials, + wait_for_ready=wait_for_ready, + compression=compression, + ) + + def future( + self, + request: Any, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Any: + client_call_details = _ClientCallDetails( + self._method, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) + + def continuation(new_details, request): + ( + new_method, + new_timeout, + new_metadata, + new_credentials, + new_wait_for_ready, + new_compression, + ) = _unwrap_client_call_details(new_details, client_call_details) + return self._thunk(new_method).future( + request, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials, + wait_for_ready=new_wait_for_ready, + compression=new_compression, + ) + + try: + return self._interceptor.intercept_unary_unary( + continuation, client_call_details, request + ) + except Exception as exception: # pylint:disable=broad-except + return _FailureOutcome(exception, sys.exc_info()[2]) + + +class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + _thunk: Callable + _method: str + _interceptor: grpc.UnaryStreamClientInterceptor + + def __init__( + self, + thunk: Callable, + method: str, + interceptor: grpc.UnaryStreamClientInterceptor, + ): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__( + self, + request: Any, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ): + client_call_details = _ClientCallDetails( + self._method, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) + + def continuation(new_details, request): + ( + new_method, + new_timeout, + new_metadata, + new_credentials, + new_wait_for_ready, + new_compression, + ) = _unwrap_client_call_details(new_details, client_call_details) + return self._thunk(new_method)( + request, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials, + wait_for_ready=new_wait_for_ready, + compression=new_compression, + ) + + try: + return self._interceptor.intercept_unary_stream( + continuation, client_call_details, request + ) + except Exception as exception: # pylint:disable=broad-except + return _FailureOutcome(exception, sys.exc_info()[2]) + + +class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + _thunk: Callable + _method: str + _interceptor: grpc.StreamUnaryClientInterceptor + + def __init__( + self, + thunk: Callable, + method: str, + interceptor: grpc.StreamUnaryClientInterceptor, + ): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__( + self, + request_iterator: RequestIterableType, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Any: + response, ignored_call = self._with_call( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials, + wait_for_ready=wait_for_ready, + compression=compression, + ) + return response + + def _with_call( + self, + request_iterator: RequestIterableType, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Tuple[Any, grpc.Call]: + client_call_details = _ClientCallDetails( + self._method, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) + + def continuation(new_details, request_iterator): + ( + new_method, + new_timeout, + new_metadata, + new_credentials, + new_wait_for_ready, + new_compression, + ) = _unwrap_client_call_details(new_details, client_call_details) + try: + response, call = self._thunk(new_method).with_call( + request_iterator, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials, + wait_for_ready=new_wait_for_ready, + compression=new_compression, + ) + return _UnaryOutcome(response, call) + except grpc.RpcError as rpc_error: + return rpc_error + except Exception as exception: # pylint:disable=broad-except + return _FailureOutcome(exception, sys.exc_info()[2]) + + call = self._interceptor.intercept_stream_unary( + continuation, client_call_details, request_iterator + ) + return call.result(), call + + def with_call( + self, + request_iterator: RequestIterableType, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Tuple[Any, grpc.Call]: + return self._with_call( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials, + wait_for_ready=wait_for_ready, + compression=compression, + ) + + def future( + self, + request_iterator: RequestIterableType, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ) -> Any: + client_call_details = _ClientCallDetails( + self._method, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) + + def continuation(new_details, request_iterator): + ( + new_method, + new_timeout, + new_metadata, + new_credentials, + new_wait_for_ready, + new_compression, + ) = _unwrap_client_call_details(new_details, client_call_details) + return self._thunk(new_method).future( + request_iterator, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials, + wait_for_ready=new_wait_for_ready, + compression=new_compression, + ) + + try: + return self._interceptor.intercept_stream_unary( + continuation, client_call_details, request_iterator + ) + except Exception as exception: # pylint:disable=broad-except + return _FailureOutcome(exception, sys.exc_info()[2]) + + +class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + _thunk: Callable + _method: str + _interceptor: grpc.StreamStreamClientInterceptor + + def __init__( + self, + thunk: Callable, + method: str, + interceptor: grpc.StreamStreamClientInterceptor, + ): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__( + self, + request_iterator: RequestIterableType, + timeout: Optional[float] = None, + metadata: Optional[MetadataType] = None, + credentials: Optional[grpc.CallCredentials] = None, + wait_for_ready: Optional[bool] = None, + compression: Optional[grpc.Compression] = None, + ): + client_call_details = _ClientCallDetails( + self._method, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) + + def continuation(new_details, request_iterator): + ( + new_method, + new_timeout, + new_metadata, + new_credentials, + new_wait_for_ready, + new_compression, + ) = _unwrap_client_call_details(new_details, client_call_details) + return self._thunk(new_method)( + request_iterator, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials, + wait_for_ready=new_wait_for_ready, + compression=new_compression, + ) + + try: + return self._interceptor.intercept_stream_stream( + continuation, client_call_details, request_iterator + ) + except Exception as exception: # pylint:disable=broad-except + return _FailureOutcome(exception, sys.exc_info()[2]) + + +class _Channel(grpc.Channel): + _channel: grpc.Channel + _interceptor: Union[ + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + ] + + def __init__( + self, + channel: grpc.Channel, + interceptor: Union[ + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + ], + ): + self._channel = channel + self._interceptor = interceptor + + def subscribe( + self, callback: Callable, try_to_connect: Optional[bool] = False + ): + self._channel.subscribe(callback, try_to_connect=try_to_connect) + + def unsubscribe(self, callback: Callable): + self._channel.unsubscribe(callback) + + # pylint: disable=arguments-differ + def unary_unary( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc.UnaryUnaryMultiCallable: + # pytype: disable=wrong-arg-count + thunk = lambda m: self._channel.unary_unary( + m, + request_serializer, + response_deserializer, + _registered_method, + ) + # pytype: enable=wrong-arg-count + if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor): + return _UnaryUnaryMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + # pylint: disable=arguments-differ + def unary_stream( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc.UnaryStreamMultiCallable: + # pytype: disable=wrong-arg-count + thunk = lambda m: self._channel.unary_stream( + m, + request_serializer, + response_deserializer, + _registered_method, + ) + # pytype: enable=wrong-arg-count + if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor): + return _UnaryStreamMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + # pylint: disable=arguments-differ + def stream_unary( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc.StreamUnaryMultiCallable: + # pytype: disable=wrong-arg-count + thunk = lambda m: self._channel.stream_unary( + m, + request_serializer, + response_deserializer, + _registered_method, + ) + # pytype: enable=wrong-arg-count + if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor): + return _StreamUnaryMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + # pylint: disable=arguments-differ + def stream_stream( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc.StreamStreamMultiCallable: + # pytype: disable=wrong-arg-count + thunk = lambda m: self._channel.stream_stream( + m, + request_serializer, + response_deserializer, + _registered_method, + ) + # pytype: enable=wrong-arg-count + if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor): + return _StreamStreamMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def _close(self): + self._channel.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._channel.close() + + +def intercept_channel( + channel: grpc.Channel, + *interceptors: Optional[ + Sequence[ + Union[ + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + ] + ] + ], +) -> grpc.Channel: + for interceptor in reversed(list(interceptors)): + if ( + not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) + and not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) + and not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) + and not isinstance(interceptor, grpc.StreamStreamClientInterceptor) + ): + raise TypeError( + "interceptor must be " + "grpc.UnaryUnaryClientInterceptor or " + "grpc.UnaryStreamClientInterceptor or " + "grpc.StreamUnaryClientInterceptor or " + "grpc.StreamStreamClientInterceptor or " + ) + channel = _Channel(channel, interceptor) + return channel |