1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
import inspect
import sys
import sentry_sdk
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
from sentry_sdk.tracing import TransactionSource
from sentry_sdk.utils import (
event_from_exception,
logger,
package_version,
qualname_from_function,
reraise,
)
try:
import ray # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("Ray not installed.")
import functools
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional
from sentry_sdk.utils import ExcInfo
def _check_sentry_initialized():
# type: () -> None
if sentry_sdk.get_client().is_active():
return
logger.debug(
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
)
def _patch_ray_remote():
# type: () -> None
old_remote = ray.remote
@functools.wraps(old_remote)
def new_remote(f, *args, **kwargs):
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
if inspect.isclass(f):
# Ray Actors
# (https://docs.ray.io/en/latest/ray-core/actors.html)
# are not supported
# (Only Ray Tasks are supported)
return old_remote(f, *args, *kwargs)
def _f(*f_args, _tracing=None, **f_kwargs):
# type: (Any, Optional[dict[str, Any]], Any) -> Any
"""
Ray Worker
"""
_check_sentry_initialized()
transaction = sentry_sdk.continue_trace(
_tracing or {},
op=OP.QUEUE_TASK_RAY,
name=qualname_from_function(f),
origin=RayIntegration.origin,
source=TransactionSource.TASK,
)
with sentry_sdk.start_transaction(transaction) as transaction:
try:
result = f(*f_args, **f_kwargs)
transaction.set_status(SPANSTATUS.OK)
except Exception:
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
rv = old_remote(_f, *args, *kwargs)
old_remote_method = rv.remote
def _remote_method_with_header_propagation(*args, **kwargs):
# type: (*Any, **Any) -> Any
"""
Ray Client
"""
with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_RAY,
name=qualname_from_function(f),
origin=RayIntegration.origin,
) as span:
tracing = {
k: v
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
}
try:
result = old_remote_method(*args, **kwargs, _tracing=tracing)
span.set_status(SPANSTATUS.OK)
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
rv.remote = _remote_method_with_header_propagation
return rv
ray.remote = new_remote
def _capture_exception(exc_info, **kwargs):
# type: (ExcInfo, **Any) -> None
client = sentry_sdk.get_client()
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={
"handled": False,
"type": RayIntegration.identifier,
},
)
sentry_sdk.capture_event(event, hint=hint)
class RayIntegration(Integration):
identifier = "ray"
origin = f"auto.queue.{identifier}"
@staticmethod
def setup_once():
# type: () -> None
version = package_version("ray")
_check_minimum_version(RayIntegration, version)
_patch_ray_remote()
|