about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
new file mode 100644
index 00000000..5340a0b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
@@ -0,0 +1,116 @@
+import sys
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import (
+    capture_internal_exceptions,
+    exc_info_from_error,
+    single_exception_from_error_tuple,
+    walk_exception_chain,
+    event_hint_with_exc_info,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Optional
+
+    from sentry_sdk._types import ExcInfo, Event, Hint
+
+
+class SparkWorkerIntegration(Integration):
+    identifier = "spark_worker"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        import pyspark.daemon as original_daemon
+
+        original_daemon.worker_main = _sentry_worker_main
+
+
+def _capture_exception(exc_info):
+    # type: (ExcInfo) -> None
+    client = sentry_sdk.get_client()
+
+    mechanism = {"type": "spark", "handled": False}
+
+    exc_info = exc_info_from_error(exc_info)
+
+    exc_type, exc_value, tb = exc_info
+    rv = []
+
+    # On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
+    for exc_type, exc_value, tb in walk_exception_chain(exc_info):
+        if exc_type not in (SystemExit, EOFError, ConnectionResetError):
+            rv.append(
+                single_exception_from_error_tuple(
+                    exc_type, exc_value, tb, client.options, mechanism
+                )
+            )
+
+    if rv:
+        rv.reverse()
+        hint = event_hint_with_exc_info(exc_info)
+        event = {"level": "error", "exception": {"values": rv}}  # type: Event
+
+        _tag_task_context()
+
+        sentry_sdk.capture_event(event, hint=hint)
+
+
+def _tag_task_context():
+    # type: () -> None
+    from pyspark.taskcontext import TaskContext
+
+    scope = sentry_sdk.get_isolation_scope()
+
+    @scope.add_event_processor
+    def process_event(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+        with capture_internal_exceptions():
+            integration = sentry_sdk.get_client().get_integration(
+                SparkWorkerIntegration
+            )
+            task_context = TaskContext.get()
+
+            if integration is None or task_context is None:
+                return event
+
+            event.setdefault("tags", {}).setdefault(
+                "stageId", str(task_context.stageId())
+            )
+            event["tags"].setdefault("partitionId", str(task_context.partitionId()))
+            event["tags"].setdefault("attemptNumber", str(task_context.attemptNumber()))
+            event["tags"].setdefault("taskAttemptId", str(task_context.taskAttemptId()))
+
+            if task_context._localProperties:
+                if "sentry_app_name" in task_context._localProperties:
+                    event["tags"].setdefault(
+                        "app_name", task_context._localProperties["sentry_app_name"]
+                    )
+                    event["tags"].setdefault(
+                        "application_id",
+                        task_context._localProperties["sentry_application_id"],
+                    )
+
+                if "callSite.short" in task_context._localProperties:
+                    event.setdefault("extra", {}).setdefault(
+                        "callSite", task_context._localProperties["callSite.short"]
+                    )
+
+        return event
+
+
+def _sentry_worker_main(*args, **kwargs):
+    # type: (*Optional[Any], **Optional[Any]) -> None
+    import pyspark.worker as original_worker
+
+    try:
+        original_worker.main(*args, **kwargs)
+    except SystemExit:
+        if sentry_sdk.get_client().get_integration(SparkWorkerIntegration) is not None:
+            exc_info = sys.exc_info()
+            with capture_internal_exceptions():
+                _capture_exception(exc_info)