aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
new file mode 100644
index 00000000..f9ef13e2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
@@ -0,0 +1,168 @@
+import json
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
+from sentry_sdk.utils import (
+ AnnotatedValue,
+ capture_internal_exceptions,
+ event_from_exception,
+)
+
+from dramatiq.broker import Broker # type: ignore
+from dramatiq.message import Message # type: ignore
+from dramatiq.middleware import Middleware, default_middleware # type: ignore
+from dramatiq.errors import Retry # type: ignore
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing import Any, Callable, Dict, Optional, Union
+ from sentry_sdk._types import Event, Hint
+
+
+class DramatiqIntegration(Integration):
+ """
+ Dramatiq integration for Sentry
+
+ Please make sure that you call `sentry_sdk.init` *before* initializing
+ your broker, as it monkey patches `Broker.__init__`.
+
+ This integration was originally developed and maintained
+ by https://github.com/jacobsvante and later donated to the Sentry
+ project.
+ """
+
+ identifier = "dramatiq"
+
+ @staticmethod
+ def setup_once():
+ # type: () -> None
+ _patch_dramatiq_broker()
+
+
+def _patch_dramatiq_broker():
+ # type: () -> None
+ original_broker__init__ = Broker.__init__
+
+ def sentry_patched_broker__init__(self, *args, **kw):
+ # type: (Broker, *Any, **Any) -> None
+ integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
+
+ try:
+ middleware = kw.pop("middleware")
+ except KeyError:
+ # Unfortunately Broker and StubBroker allows middleware to be
+ # passed in as positional arguments, whilst RabbitmqBroker and
+ # RedisBroker does not.
+ if len(args) == 1:
+ middleware = args[0]
+ args = [] # type: ignore
+ else:
+ middleware = None
+
+ if middleware is None:
+ middleware = list(m() for m in default_middleware)
+ else:
+ middleware = list(middleware)
+
+ if integration is not None:
+ middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)]
+ middleware.insert(0, SentryMiddleware())
+
+ kw["middleware"] = middleware
+ original_broker__init__(self, *args, **kw)
+
+ Broker.__init__ = sentry_patched_broker__init__
+
+
+class SentryMiddleware(Middleware): # type: ignore[misc]
+ """
+ A Dramatiq middleware that automatically captures and sends
+ exceptions to Sentry.
+
+ This is automatically added to every instantiated broker via the
+ DramatiqIntegration.
+ """
+
+ def before_process_message(self, broker, message):
+ # type: (Broker, Message) -> None
+ integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
+ if integration is None:
+ return
+
+ message._scope_manager = sentry_sdk.new_scope()
+ message._scope_manager.__enter__()
+
+ scope = sentry_sdk.get_current_scope()
+ scope.transaction = message.actor_name
+ scope.set_extra("dramatiq_message_id", message.message_id)
+ scope.add_event_processor(_make_message_event_processor(message, integration))
+
+ def after_process_message(self, broker, message, *, result=None, exception=None):
+ # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None
+ integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
+ if integration is None:
+ return
+
+ actor = broker.get_actor(message.actor_name)
+ throws = message.options.get("throws") or actor.options.get("throws")
+
+ try:
+ if (
+ exception is not None
+ and not (throws and isinstance(exception, throws))
+ and not isinstance(exception, Retry)
+ ):
+ event, hint = event_from_exception(
+ exception,
+ client_options=sentry_sdk.get_client().options,
+ mechanism={
+ "type": DramatiqIntegration.identifier,
+ "handled": False,
+ },
+ )
+ sentry_sdk.capture_event(event, hint=hint)
+ finally:
+ message._scope_manager.__exit__(None, None, None)
+
+
+def _make_message_event_processor(message, integration):
+ # type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
+
+ def inner(event, hint):
+ # type: (Event, Hint) -> Optional[Event]
+ with capture_internal_exceptions():
+ DramatiqMessageExtractor(message).extract_into_event(event)
+
+ return event
+
+ return inner
+
+
+class DramatiqMessageExtractor:
+ def __init__(self, message):
+ # type: (Message) -> None
+ self.message_data = dict(message.asdict())
+
+ def content_length(self):
+ # type: () -> int
+ return len(json.dumps(self.message_data))
+
+ def extract_into_event(self, event):
+ # type: (Event) -> None
+ client = sentry_sdk.get_client()
+ if not client.is_active():
+ return
+
+ contexts = event.setdefault("contexts", {})
+ request_info = contexts.setdefault("dramatiq", {})
+ request_info["type"] = "dramatiq"
+
+ data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
+ if not request_body_within_bounds(client, self.content_length()):
+ data = AnnotatedValue.removed_because_over_size_limit()
+ else:
+ data = self.message_data
+
+ request_info["data"] = data