about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py246
1 files changed, 246 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py
new file mode 100644
index 00000000..1ea8e32f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/arq.py
@@ -0,0 +1,246 @@
+import sys
+
+import sentry_sdk
+from sentry_sdk.consts import OP, SPANSTATUS
+from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
+from sentry_sdk.integrations.logging import ignore_logger
+from sentry_sdk.scope import should_send_default_pii
+from sentry_sdk.tracing import Transaction, TransactionSource
+from sentry_sdk.utils import (
+    capture_internal_exceptions,
+    ensure_integration_enabled,
+    event_from_exception,
+    SENSITIVE_DATA_SUBSTITUTE,
+    parse_version,
+    reraise,
+)
+
+try:
+    import arq.worker
+    from arq.version import VERSION as ARQ_VERSION
+    from arq.connections import ArqRedis
+    from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker
+except ImportError:
+    raise DidNotEnable("Arq is not installed")
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any, Dict, Optional, Union
+
+    from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint
+
+    from arq.cron import CronJob
+    from arq.jobs import Job
+    from arq.typing import WorkerCoroutine
+    from arq.worker import Function
+
+ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob)
+
+
+class ArqIntegration(Integration):
+    identifier = "arq"
+    origin = f"auto.queue.{identifier}"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+
+        try:
+            if isinstance(ARQ_VERSION, str):
+                version = parse_version(ARQ_VERSION)
+            else:
+                version = ARQ_VERSION.version[:2]
+
+        except (TypeError, ValueError):
+            version = None
+
+        _check_minimum_version(ArqIntegration, version)
+
+        patch_enqueue_job()
+        patch_run_job()
+        patch_create_worker()
+
+        ignore_logger("arq.worker")
+
+
+def patch_enqueue_job():
+    # type: () -> None
+    old_enqueue_job = ArqRedis.enqueue_job
+    original_kwdefaults = old_enqueue_job.__kwdefaults__
+
+    async def _sentry_enqueue_job(self, function, *args, **kwargs):
+        # type: (ArqRedis, str, *Any, **Any) -> Optional[Job]
+        integration = sentry_sdk.get_client().get_integration(ArqIntegration)
+        if integration is None:
+            return await old_enqueue_job(self, function, *args, **kwargs)
+
+        with sentry_sdk.start_span(
+            op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin
+        ):
+            return await old_enqueue_job(self, function, *args, **kwargs)
+
+    _sentry_enqueue_job.__kwdefaults__ = original_kwdefaults
+    ArqRedis.enqueue_job = _sentry_enqueue_job
+
+
+def patch_run_job():
+    # type: () -> None
+    old_run_job = Worker.run_job
+
+    async def _sentry_run_job(self, job_id, score):
+        # type: (Worker, str, int) -> None
+        integration = sentry_sdk.get_client().get_integration(ArqIntegration)
+        if integration is None:
+            return await old_run_job(self, job_id, score)
+
+        with sentry_sdk.isolation_scope() as scope:
+            scope._name = "arq"
+            scope.clear_breadcrumbs()
+
+            transaction = Transaction(
+                name="unknown arq task",
+                status="ok",
+                op=OP.QUEUE_TASK_ARQ,
+                source=TransactionSource.TASK,
+                origin=ArqIntegration.origin,
+            )
+
+            with sentry_sdk.start_transaction(transaction):
+                return await old_run_job(self, job_id, score)
+
+    Worker.run_job = _sentry_run_job
+
+
+def _capture_exception(exc_info):
+    # type: (ExcInfo) -> None
+    scope = sentry_sdk.get_current_scope()
+
+    if scope.transaction is not None:
+        if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
+            scope.transaction.set_status(SPANSTATUS.ABORTED)
+            return
+
+        scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
+
+    event, hint = event_from_exception(
+        exc_info,
+        client_options=sentry_sdk.get_client().options,
+        mechanism={"type": ArqIntegration.identifier, "handled": False},
+    )
+    sentry_sdk.capture_event(event, hint=hint)
+
+
+def _make_event_processor(ctx, *args, **kwargs):
+    # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor
+    def event_processor(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+
+        with capture_internal_exceptions():
+            scope = sentry_sdk.get_current_scope()
+            if scope.transaction is not None:
+                scope.transaction.name = ctx["job_name"]
+                event["transaction"] = ctx["job_name"]
+
+            tags = event.setdefault("tags", {})
+            tags["arq_task_id"] = ctx["job_id"]
+            tags["arq_task_retry"] = ctx["job_try"] > 1
+            extra = event.setdefault("extra", {})
+            extra["arq-job"] = {
+                "task": ctx["job_name"],
+                "args": (
+                    args if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
+                ),
+                "kwargs": (
+                    kwargs if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
+                ),
+                "retry": ctx["job_try"],
+            }
+
+        return event
+
+    return event_processor
+
+
+def _wrap_coroutine(name, coroutine):
+    # type: (str, WorkerCoroutine) -> WorkerCoroutine
+
+    async def _sentry_coroutine(ctx, *args, **kwargs):
+        # type: (Dict[Any, Any], *Any, **Any) -> Any
+        integration = sentry_sdk.get_client().get_integration(ArqIntegration)
+        if integration is None:
+            return await coroutine(ctx, *args, **kwargs)
+
+        sentry_sdk.get_isolation_scope().add_event_processor(
+            _make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
+        )
+
+        try:
+            result = await coroutine(ctx, *args, **kwargs)
+        except Exception:
+            exc_info = sys.exc_info()
+            _capture_exception(exc_info)
+            reraise(*exc_info)
+
+        return result
+
+    return _sentry_coroutine
+
+
+def patch_create_worker():
+    # type: () -> None
+    old_create_worker = arq.worker.create_worker
+
+    @ensure_integration_enabled(ArqIntegration, old_create_worker)
+    def _sentry_create_worker(*args, **kwargs):
+        # type: (*Any, **Any) -> Worker
+        settings_cls = args[0]
+
+        if isinstance(settings_cls, dict):
+            if "functions" in settings_cls:
+                settings_cls["functions"] = [
+                    _get_arq_function(func)
+                    for func in settings_cls.get("functions", [])
+                ]
+            if "cron_jobs" in settings_cls:
+                settings_cls["cron_jobs"] = [
+                    _get_arq_cron_job(cron_job)
+                    for cron_job in settings_cls.get("cron_jobs", [])
+                ]
+
+        if hasattr(settings_cls, "functions"):
+            settings_cls.functions = [
+                _get_arq_function(func) for func in settings_cls.functions
+            ]
+        if hasattr(settings_cls, "cron_jobs"):
+            settings_cls.cron_jobs = [
+                _get_arq_cron_job(cron_job) for cron_job in settings_cls.cron_jobs
+            ]
+
+        if "functions" in kwargs:
+            kwargs["functions"] = [
+                _get_arq_function(func) for func in kwargs.get("functions", [])
+            ]
+        if "cron_jobs" in kwargs:
+            kwargs["cron_jobs"] = [
+                _get_arq_cron_job(cron_job) for cron_job in kwargs.get("cron_jobs", [])
+            ]
+
+        return old_create_worker(*args, **kwargs)
+
+    arq.worker.create_worker = _sentry_create_worker
+
+
+def _get_arq_function(func):
+    # type: (Union[str, Function, WorkerCoroutine]) -> Function
+    arq_func = arq.worker.func(func)
+    arq_func.coroutine = _wrap_coroutine(arq_func.name, arq_func.coroutine)
+
+    return arq_func
+
+
+def _get_arq_cron_job(cron_job):
+    # type: (CronJob) -> CronJob
+    cron_job.coroutine = _wrap_coroutine(cron_job.name, cron_job.coroutine)
+
+    return cron_job