1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable, Iterator, Iterable, Union
try:
import grpc
from grpc import ClientCallDetails, Call
from grpc._interceptor import _UnaryOutcome
from grpc.aio._interceptor import UnaryStreamCall
from google.protobuf.message import Message
except ImportError:
raise DidNotEnable("grpcio is not installed")
class ClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor # type: ignore
):
_is_intercepted = False
def intercept_unary_unary(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
method = client_call_details.method
with sentry_sdk.start_span(
op=OP.GRPC_CLIENT,
name="unary unary call to %s" % method,
origin=SPAN_ORIGIN,
) as span:
span.set_data("type", "unary unary")
span.set_data("method", method)
client_call_details = self._update_client_call_details_metadata_from_scope(
client_call_details
)
response = continuation(client_call_details, request)
span.set_data("code", response.code().name)
return response
def intercept_unary_stream(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]], ClientCallDetails, Message) -> Union[Iterator[Message], Call]
method = client_call_details.method
with sentry_sdk.start_span(
op=OP.GRPC_CLIENT,
name="unary stream call to %s" % method,
origin=SPAN_ORIGIN,
) as span:
span.set_data("type", "unary stream")
span.set_data("method", method)
client_call_details = self._update_client_call_details_metadata_from_scope(
client_call_details
)
response = continuation(
client_call_details, request
) # type: UnaryStreamCall
# Setting code on unary-stream leads to execution getting stuck
# span.set_data("code", response.code().name)
return response
@staticmethod
def _update_client_call_details_metadata_from_scope(client_call_details):
# type: (ClientCallDetails) -> ClientCallDetails
metadata = (
list(client_call_details.metadata) if client_call_details.metadata else []
)
for (
key,
value,
) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
metadata.append((key, value))
client_call_details = grpc._interceptor._ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
compression=client_call_details.compression,
)
return client_call_details
|