about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/__init__.py151
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/__init__.py7
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/client.py94
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/server.py100
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/client.py92
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/consts.py1
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/server.py66
7 files changed, 511 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/__init__.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/__init__.py
new file mode 100644
index 00000000..d9dcdddb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/__init__.py
@@ -0,0 +1,151 @@
+from functools import wraps
+
+import grpc
+from grpc import Channel, Server, intercept_channel
+from grpc.aio import Channel as AsyncChannel
+from grpc.aio import Server as AsyncServer
+
+from sentry_sdk.integrations import Integration
+
+from .client import ClientInterceptor
+from .server import ServerInterceptor
+from .aio.server import ServerInterceptor as AsyncServerInterceptor
+from .aio.client import (
+    SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor,
+)
+from .aio.client import (
+    SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor,
+)
+
+from typing import TYPE_CHECKING, Any, Optional, Sequence
+
+# Hack to get new Python features working in older versions
+# without introducing a hard dependency on `typing_extensions`
+# from: https://stackoverflow.com/a/71944042/300572
+if TYPE_CHECKING:
+    from typing import ParamSpec, Callable
+else:
+    # Fake ParamSpec
+    class ParamSpec:
+        def __init__(self, _):
+            self.args = None
+            self.kwargs = None
+
+    # Callable[anything] will return None
+    class _Callable:
+        def __getitem__(self, _):
+            return None
+
+    # Make instances
+    Callable = _Callable()
+
+P = ParamSpec("P")
+
+
+def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
+    "Wrapper for synchronous secure and insecure channel."
+
+    @wraps(func)
+    def patched_channel(*args: Any, **kwargs: Any) -> Channel:
+        channel = func(*args, **kwargs)
+        if not ClientInterceptor._is_intercepted:
+            ClientInterceptor._is_intercepted = True
+            return intercept_channel(channel, ClientInterceptor())
+        else:
+            return channel
+
+    return patched_channel
+
+
+def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]:
+    @wraps(func)
+    def patched_intercept_channel(
+        channel: Channel, *interceptors: grpc.ServerInterceptor
+    ) -> Channel:
+        if ClientInterceptor._is_intercepted:
+            interceptors = tuple(
+                [
+                    interceptor
+                    for interceptor in interceptors
+                    if not isinstance(interceptor, ClientInterceptor)
+                ]
+            )
+        else:
+            interceptors = interceptors
+        return intercept_channel(channel, *interceptors)
+
+    return patched_intercept_channel  # type: ignore
+
+
+def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
+    "Wrapper for asynchronous secure and insecure channel."
+
+    @wraps(func)
+    def patched_channel(  # type: ignore
+        *args: P.args,
+        interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
+        **kwargs: P.kwargs,
+    ) -> Channel:
+        sentry_interceptors = [
+            AsyncUnaryUnaryClientInterceptor(),
+            AsyncUnaryStreamClientIntercetor(),
+        ]
+        interceptors = [*sentry_interceptors, *(interceptors or [])]
+        return func(*args, interceptors=interceptors, **kwargs)  # type: ignore
+
+    return patched_channel  # type: ignore
+
+
+def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
+    """Wrapper for synchronous server."""
+
+    @wraps(func)
+    def patched_server(  # type: ignore
+        *args: P.args,
+        interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
+        **kwargs: P.kwargs,
+    ) -> Server:
+        interceptors = [
+            interceptor
+            for interceptor in interceptors or []
+            if not isinstance(interceptor, ServerInterceptor)
+        ]
+        server_interceptor = ServerInterceptor()
+        interceptors = [server_interceptor, *(interceptors or [])]
+        return func(*args, interceptors=interceptors, **kwargs)  # type: ignore
+
+    return patched_server  # type: ignore
+
+
+def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
+    """Wrapper for asynchronous server."""
+
+    @wraps(func)
+    def patched_aio_server(  # type: ignore
+        *args: P.args,
+        interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
+        **kwargs: P.kwargs,
+    ) -> Server:
+        server_interceptor = AsyncServerInterceptor()
+        interceptors = (server_interceptor, *(interceptors or []))
+        return func(*args, interceptors=interceptors, **kwargs)  # type: ignore
+
+    return patched_aio_server  # type: ignore
+
+
+class GRPCIntegration(Integration):
+    identifier = "grpc"
+
+    @staticmethod
+    def setup_once() -> None:
+        import grpc
+
+        grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
+        grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)
+        grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel)
+
+        grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
+        grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)
+
+        grpc.server = _wrap_sync_server(grpc.server)
+        grpc.aio.server = _wrap_async_server(grpc.aio.server)
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/__init__.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/__init__.py
new file mode 100644
index 00000000..5b9e3b99
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/__init__.py
@@ -0,0 +1,7 @@
+from .server import ServerInterceptor
+from .client import ClientInterceptor
+
+__all__ = [
+    "ClientInterceptor",
+    "ServerInterceptor",
+]
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/client.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/client.py
new file mode 100644
index 00000000..ff3c2131
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/client.py
@@ -0,0 +1,94 @@
+from typing import Callable, Union, AsyncIterable, Any
+
+from grpc.aio import (
+    UnaryUnaryClientInterceptor,
+    UnaryStreamClientInterceptor,
+    ClientCallDetails,
+    UnaryUnaryCall,
+    UnaryStreamCall,
+    Metadata,
+)
+from google.protobuf.message import Message
+
+import sentry_sdk
+from sentry_sdk.consts import OP
+from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
+
+
+class ClientInterceptor:
+    @staticmethod
+    def _update_client_call_details_metadata_from_scope(
+        client_call_details: ClientCallDetails,
+    ) -> ClientCallDetails:
+        if client_call_details.metadata is None:
+            client_call_details = client_call_details._replace(metadata=Metadata())
+        elif not isinstance(client_call_details.metadata, Metadata):
+            # This is a workaround for a GRPC bug, which was fixed in grpcio v1.60.0
+            # See https://github.com/grpc/grpc/issues/34298.
+            client_call_details = client_call_details._replace(
+                metadata=Metadata.from_tuple(client_call_details.metadata)
+            )
+        for (
+            key,
+            value,
+        ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
+            client_call_details.metadata.add(key, value)
+        return client_call_details
+
+
+class SentryUnaryUnaryClientInterceptor(ClientInterceptor, UnaryUnaryClientInterceptor):  # type: ignore
+    async def intercept_unary_unary(
+        self,
+        continuation: Callable[[ClientCallDetails, Message], UnaryUnaryCall],
+        client_call_details: ClientCallDetails,
+        request: Message,
+    ) -> Union[UnaryUnaryCall, Message]:
+        method = client_call_details.method
+
+        with sentry_sdk.start_span(
+            op=OP.GRPC_CLIENT,
+            name="unary unary call to %s" % method.decode(),
+            origin=SPAN_ORIGIN,
+        ) as span:
+            span.set_data("type", "unary unary")
+            span.set_data("method", method)
+
+            client_call_details = self._update_client_call_details_metadata_from_scope(
+                client_call_details
+            )
+
+            response = await continuation(client_call_details, request)
+            status_code = await response.code()
+            span.set_data("code", status_code.name)
+
+            return response
+
+
+class SentryUnaryStreamClientInterceptor(
+    ClientInterceptor, UnaryStreamClientInterceptor  # type: ignore
+):
+    async def intercept_unary_stream(
+        self,
+        continuation: Callable[[ClientCallDetails, Message], UnaryStreamCall],
+        client_call_details: ClientCallDetails,
+        request: Message,
+    ) -> Union[AsyncIterable[Any], UnaryStreamCall]:
+        method = client_call_details.method
+
+        with sentry_sdk.start_span(
+            op=OP.GRPC_CLIENT,
+            name="unary stream call to %s" % method.decode(),
+            origin=SPAN_ORIGIN,
+        ) as span:
+            span.set_data("type", "unary stream")
+            span.set_data("method", method)
+
+            client_call_details = self._update_client_call_details_metadata_from_scope(
+                client_call_details
+            )
+
+            response = await continuation(client_call_details, request)
+            # status_code = await response.code()
+            # span.set_data("code", status_code)
+
+            return response
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/server.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/server.py
new file mode 100644
index 00000000..381c6310
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/aio/server.py
@@ -0,0 +1,100 @@
+import sentry_sdk
+from sentry_sdk.consts import OP
+from sentry_sdk.integrations import DidNotEnable
+from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
+from sentry_sdk.tracing import Transaction, TransactionSource
+from sentry_sdk.utils import event_from_exception
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from collections.abc import Awaitable, Callable
+    from typing import Any, Optional
+
+
+try:
+    import grpc
+    from grpc import HandlerCallDetails, RpcMethodHandler
+    from grpc.aio import AbortError, ServicerContext
+except ImportError:
+    raise DidNotEnable("grpcio is not installed")
+
+
+class ServerInterceptor(grpc.aio.ServerInterceptor):  # type: ignore
+    def __init__(self, find_name=None):
+        # type: (ServerInterceptor, Callable[[ServicerContext], str] | None) -> None
+        self._find_method_name = find_name or self._find_name
+
+        super().__init__()
+
+    async def intercept_service(self, continuation, handler_call_details):
+        # type: (ServerInterceptor, Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]], HandlerCallDetails) -> Optional[Awaitable[RpcMethodHandler]]
+        self._handler_call_details = handler_call_details
+        handler = await continuation(handler_call_details)
+        if handler is None:
+            return None
+
+        if not handler.request_streaming and not handler.response_streaming:
+            handler_factory = grpc.unary_unary_rpc_method_handler
+
+            async def wrapped(request, context):
+                # type: (Any, ServicerContext) -> Any
+                name = self._find_method_name(context)
+                if not name:
+                    return await handler(request, context)
+
+                # What if the headers are empty?
+                transaction = Transaction.continue_from_headers(
+                    dict(context.invocation_metadata()),
+                    op=OP.GRPC_SERVER,
+                    name=name,
+                    source=TransactionSource.CUSTOM,
+                    origin=SPAN_ORIGIN,
+                )
+
+                with sentry_sdk.start_transaction(transaction=transaction):
+                    try:
+                        return await handler.unary_unary(request, context)
+                    except AbortError:
+                        raise
+                    except Exception as exc:
+                        event, hint = event_from_exception(
+                            exc,
+                            mechanism={"type": "grpc", "handled": False},
+                        )
+                        sentry_sdk.capture_event(event, hint=hint)
+                        raise
+
+        elif not handler.request_streaming and handler.response_streaming:
+            handler_factory = grpc.unary_stream_rpc_method_handler
+
+            async def wrapped(request, context):  # type: ignore
+                # type: (Any, ServicerContext) -> Any
+                async for r in handler.unary_stream(request, context):
+                    yield r
+
+        elif handler.request_streaming and not handler.response_streaming:
+            handler_factory = grpc.stream_unary_rpc_method_handler
+
+            async def wrapped(request, context):
+                # type: (Any, ServicerContext) -> Any
+                response = handler.stream_unary(request, context)
+                return await response
+
+        elif handler.request_streaming and handler.response_streaming:
+            handler_factory = grpc.stream_stream_rpc_method_handler
+
+            async def wrapped(request, context):  # type: ignore
+                # type: (Any, ServicerContext) -> Any
+                async for r in handler.stream_stream(request, context):
+                    yield r
+
+        return handler_factory(
+            wrapped,
+            request_deserializer=handler.request_deserializer,
+            response_serializer=handler.response_serializer,
+        )
+
+    def _find_name(self, context):
+        # type: (ServicerContext) -> str
+        return self._handler_call_details.method
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/client.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/client.py
new file mode 100644
index 00000000..a5b4f9f5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/client.py
@@ -0,0 +1,92 @@
+import sentry_sdk
+from sentry_sdk.consts import OP
+from sentry_sdk.integrations import DidNotEnable
+from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any, Callable, Iterator, Iterable, Union
+
+try:
+    import grpc
+    from grpc import ClientCallDetails, Call
+    from grpc._interceptor import _UnaryOutcome
+    from grpc.aio._interceptor import UnaryStreamCall
+    from google.protobuf.message import Message
+except ImportError:
+    raise DidNotEnable("grpcio is not installed")
+
+
+class ClientInterceptor(
+    grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor  # type: ignore
+):
+    _is_intercepted = False
+
+    def intercept_unary_unary(self, continuation, client_call_details, request):
+        # type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
+        method = client_call_details.method
+
+        with sentry_sdk.start_span(
+            op=OP.GRPC_CLIENT,
+            name="unary unary call to %s" % method,
+            origin=SPAN_ORIGIN,
+        ) as span:
+            span.set_data("type", "unary unary")
+            span.set_data("method", method)
+
+            client_call_details = self._update_client_call_details_metadata_from_scope(
+                client_call_details
+            )
+
+            response = continuation(client_call_details, request)
+            span.set_data("code", response.code().name)
+
+            return response
+
+    def intercept_unary_stream(self, continuation, client_call_details, request):
+        # type: (ClientInterceptor, Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]], ClientCallDetails, Message) -> Union[Iterator[Message], Call]
+        method = client_call_details.method
+
+        with sentry_sdk.start_span(
+            op=OP.GRPC_CLIENT,
+            name="unary stream call to %s" % method,
+            origin=SPAN_ORIGIN,
+        ) as span:
+            span.set_data("type", "unary stream")
+            span.set_data("method", method)
+
+            client_call_details = self._update_client_call_details_metadata_from_scope(
+                client_call_details
+            )
+
+            response = continuation(
+                client_call_details, request
+            )  # type: UnaryStreamCall
+            # Setting code on unary-stream leads to execution getting stuck
+            # span.set_data("code", response.code().name)
+
+            return response
+
+    @staticmethod
+    def _update_client_call_details_metadata_from_scope(client_call_details):
+        # type: (ClientCallDetails) -> ClientCallDetails
+        metadata = (
+            list(client_call_details.metadata) if client_call_details.metadata else []
+        )
+        for (
+            key,
+            value,
+        ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
+            metadata.append((key, value))
+
+        client_call_details = grpc._interceptor._ClientCallDetails(
+            method=client_call_details.method,
+            timeout=client_call_details.timeout,
+            metadata=metadata,
+            credentials=client_call_details.credentials,
+            wait_for_ready=client_call_details.wait_for_ready,
+            compression=client_call_details.compression,
+        )
+
+        return client_call_details
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/consts.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/consts.py
new file mode 100644
index 00000000..9fdb975c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/consts.py
@@ -0,0 +1 @@
+SPAN_ORIGIN = "auto.grpc.grpc"
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/server.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/server.py
new file mode 100644
index 00000000..0d2792d1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/grpc/server.py
@@ -0,0 +1,66 @@
+import sentry_sdk
+from sentry_sdk.consts import OP
+from sentry_sdk.integrations import DidNotEnable
+from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
+from sentry_sdk.tracing import Transaction, TransactionSource
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Callable, Optional
+    from google.protobuf.message import Message
+
+try:
+    import grpc
+    from grpc import ServicerContext, HandlerCallDetails, RpcMethodHandler
+except ImportError:
+    raise DidNotEnable("grpcio is not installed")
+
+
+class ServerInterceptor(grpc.ServerInterceptor):  # type: ignore
+    def __init__(self, find_name=None):
+        # type: (ServerInterceptor, Optional[Callable[[ServicerContext], str]]) -> None
+        self._find_method_name = find_name or ServerInterceptor._find_name
+
+        super().__init__()
+
+    def intercept_service(self, continuation, handler_call_details):
+        # type: (ServerInterceptor, Callable[[HandlerCallDetails], RpcMethodHandler], HandlerCallDetails) -> RpcMethodHandler
+        handler = continuation(handler_call_details)
+        if not handler or not handler.unary_unary:
+            return handler
+
+        def behavior(request, context):
+            # type: (Message, ServicerContext) -> Message
+            with sentry_sdk.isolation_scope():
+                name = self._find_method_name(context)
+
+                if name:
+                    metadata = dict(context.invocation_metadata())
+
+                    transaction = Transaction.continue_from_headers(
+                        metadata,
+                        op=OP.GRPC_SERVER,
+                        name=name,
+                        source=TransactionSource.CUSTOM,
+                        origin=SPAN_ORIGIN,
+                    )
+
+                    with sentry_sdk.start_transaction(transaction=transaction):
+                        try:
+                            return handler.unary_unary(request, context)
+                        except BaseException as e:
+                            raise e
+                else:
+                    return handler.unary_unary(request, context)
+
+        return grpc.unary_unary_rpc_method_handler(
+            behavior,
+            request_deserializer=handler.request_deserializer,
+            response_serializer=handler.response_serializer,
+        )
+
+    @staticmethod
+    def _find_name(context):
+        # type: (ServicerContext) -> str
+        return context._rpc_event.call_details.method.decode()