about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
new file mode 100644
index 00000000..f9ef13e2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/dramatiq.py
@@ -0,0 +1,168 @@
+import json
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
+from sentry_sdk.utils import (
+    AnnotatedValue,
+    capture_internal_exceptions,
+    event_from_exception,
+)
+
+from dramatiq.broker import Broker  # type: ignore
+from dramatiq.message import Message  # type: ignore
+from dramatiq.middleware import Middleware, default_middleware  # type: ignore
+from dramatiq.errors import Retry  # type: ignore
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any, Callable, Dict, Optional, Union
+    from sentry_sdk._types import Event, Hint
+
+
+class DramatiqIntegration(Integration):
+    """
+    Dramatiq integration for Sentry
+
+    Please make sure that you call `sentry_sdk.init` *before* initializing
+    your broker, as it monkey patches `Broker.__init__`.
+
+    This integration was originally developed and maintained
+    by https://github.com/jacobsvante and later donated to the Sentry
+    project.
+    """
+
+    identifier = "dramatiq"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        _patch_dramatiq_broker()
+
+
+def _patch_dramatiq_broker():
+    # type: () -> None
+    original_broker__init__ = Broker.__init__
+
+    def sentry_patched_broker__init__(self, *args, **kw):
+        # type: (Broker, *Any, **Any) -> None
+        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
+
+        try:
+            middleware = kw.pop("middleware")
+        except KeyError:
+            # Unfortunately Broker and StubBroker allows middleware to be
+            # passed in as positional arguments, whilst RabbitmqBroker and
+            # RedisBroker does not.
+            if len(args) == 1:
+                middleware = args[0]
+                args = []  # type: ignore
+            else:
+                middleware = None
+
+        if middleware is None:
+            middleware = list(m() for m in default_middleware)
+        else:
+            middleware = list(middleware)
+
+        if integration is not None:
+            middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)]
+            middleware.insert(0, SentryMiddleware())
+
+        kw["middleware"] = middleware
+        original_broker__init__(self, *args, **kw)
+
+    Broker.__init__ = sentry_patched_broker__init__
+
+
+class SentryMiddleware(Middleware):  # type: ignore[misc]
+    """
+    A Dramatiq middleware that automatically captures and sends
+    exceptions to Sentry.
+
+    This is automatically added to every instantiated broker via the
+    DramatiqIntegration.
+    """
+
+    def before_process_message(self, broker, message):
+        # type: (Broker, Message) -> None
+        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
+        if integration is None:
+            return
+
+        message._scope_manager = sentry_sdk.new_scope()
+        message._scope_manager.__enter__()
+
+        scope = sentry_sdk.get_current_scope()
+        scope.transaction = message.actor_name
+        scope.set_extra("dramatiq_message_id", message.message_id)
+        scope.add_event_processor(_make_message_event_processor(message, integration))
+
+    def after_process_message(self, broker, message, *, result=None, exception=None):
+        # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None
+        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
+        if integration is None:
+            return
+
+        actor = broker.get_actor(message.actor_name)
+        throws = message.options.get("throws") or actor.options.get("throws")
+
+        try:
+            if (
+                exception is not None
+                and not (throws and isinstance(exception, throws))
+                and not isinstance(exception, Retry)
+            ):
+                event, hint = event_from_exception(
+                    exception,
+                    client_options=sentry_sdk.get_client().options,
+                    mechanism={
+                        "type": DramatiqIntegration.identifier,
+                        "handled": False,
+                    },
+                )
+                sentry_sdk.capture_event(event, hint=hint)
+        finally:
+            message._scope_manager.__exit__(None, None, None)
+
+
+def _make_message_event_processor(message, integration):
+    # type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
+
+    def inner(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+        with capture_internal_exceptions():
+            DramatiqMessageExtractor(message).extract_into_event(event)
+
+        return event
+
+    return inner
+
+
+class DramatiqMessageExtractor:
+    def __init__(self, message):
+        # type: (Message) -> None
+        self.message_data = dict(message.asdict())
+
+    def content_length(self):
+        # type: () -> int
+        return len(json.dumps(self.message_data))
+
+    def extract_into_event(self, event):
+        # type: (Event) -> None
+        client = sentry_sdk.get_client()
+        if not client.is_active():
+            return
+
+        contexts = event.setdefault("contexts", {})
+        request_info = contexts.setdefault("dramatiq", {})
+        request_info["type"] = "dramatiq"
+
+        data = None  # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
+        if not request_body_within_bounds(client, self.content_length()):
+            data = AnnotatedValue.removed_because_over_size_limit()
+        else:
+            data = self.message_data
+
+        request_info["data"] = data