1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
from functools import wraps
import grpc
from grpc import Channel, Server, intercept_channel
from grpc.aio import Channel as AsyncChannel
from grpc.aio import Server as AsyncServer
from sentry_sdk.integrations import Integration
from .client import ClientInterceptor
from .server import ServerInterceptor
from .aio.server import ServerInterceptor as AsyncServerInterceptor
from .aio.client import (
SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor,
)
from .aio.client import (
SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor,
)
from typing import TYPE_CHECKING, Any, Optional, Sequence
# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
from typing import ParamSpec, Callable
else:
# Fake ParamSpec
class ParamSpec:
def __init__(self, _):
self.args = None
self.kwargs = None
# Callable[anything] will return None
class _Callable:
def __getitem__(self, _):
return None
# Make instances
Callable = _Callable()
P = ParamSpec("P")
def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
"Wrapper for synchronous secure and insecure channel."
@wraps(func)
def patched_channel(*args: Any, **kwargs: Any) -> Channel:
channel = func(*args, **kwargs)
if not ClientInterceptor._is_intercepted:
ClientInterceptor._is_intercepted = True
return intercept_channel(channel, ClientInterceptor())
else:
return channel
return patched_channel
def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]:
@wraps(func)
def patched_intercept_channel(
channel: Channel, *interceptors: grpc.ServerInterceptor
) -> Channel:
if ClientInterceptor._is_intercepted:
interceptors = tuple(
[
interceptor
for interceptor in interceptors
if not isinstance(interceptor, ClientInterceptor)
]
)
else:
interceptors = interceptors
return intercept_channel(channel, *interceptors)
return patched_intercept_channel # type: ignore
def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
"Wrapper for asynchronous secure and insecure channel."
@wraps(func)
def patched_channel( # type: ignore
*args: P.args,
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
**kwargs: P.kwargs,
) -> Channel:
sentry_interceptors = [
AsyncUnaryUnaryClientInterceptor(),
AsyncUnaryStreamClientIntercetor(),
]
interceptors = [*sentry_interceptors, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
return patched_channel # type: ignore
def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
"""Wrapper for synchronous server."""
@wraps(func)
def patched_server( # type: ignore
*args: P.args,
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
**kwargs: P.kwargs,
) -> Server:
interceptors = [
interceptor
for interceptor in interceptors or []
if not isinstance(interceptor, ServerInterceptor)
]
server_interceptor = ServerInterceptor()
interceptors = [server_interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
return patched_server # type: ignore
def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
"""Wrapper for asynchronous server."""
@wraps(func)
def patched_aio_server( # type: ignore
*args: P.args,
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
**kwargs: P.kwargs,
) -> Server:
server_interceptor = AsyncServerInterceptor()
interceptors = (server_interceptor, *(interceptors or []))
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
return patched_aio_server # type: ignore
class GRPCIntegration(Integration):
identifier = "grpc"
@staticmethod
def setup_once() -> None:
import grpc
grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)
grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel)
grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)
grpc.server = _wrap_sync_server(grpc.server)
grpc.aio.server = _wrap_async_server(grpc.aio.server)
|