aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
new file mode 100644
index 00000000..5340a0b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
@@ -0,0 +1,116 @@
+import sys
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import (
+ capture_internal_exceptions,
+ exc_info_from_error,
+ single_exception_from_error_tuple,
+ walk_exception_chain,
+ event_hint_with_exc_info,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing import Any
+ from typing import Optional
+
+ from sentry_sdk._types import ExcInfo, Event, Hint
+
+
+class SparkWorkerIntegration(Integration):
+ identifier = "spark_worker"
+
+ @staticmethod
+ def setup_once():
+ # type: () -> None
+ import pyspark.daemon as original_daemon
+
+ original_daemon.worker_main = _sentry_worker_main
+
+
+def _capture_exception(exc_info):
+ # type: (ExcInfo) -> None
+ client = sentry_sdk.get_client()
+
+ mechanism = {"type": "spark", "handled": False}
+
+ exc_info = exc_info_from_error(exc_info)
+
+ exc_type, exc_value, tb = exc_info
+ rv = []
+
+ # On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
+ for exc_type, exc_value, tb in walk_exception_chain(exc_info):
+ if exc_type not in (SystemExit, EOFError, ConnectionResetError):
+ rv.append(
+ single_exception_from_error_tuple(
+ exc_type, exc_value, tb, client.options, mechanism
+ )
+ )
+
+ if rv:
+ rv.reverse()
+ hint = event_hint_with_exc_info(exc_info)
+ event = {"level": "error", "exception": {"values": rv}} # type: Event
+
+ _tag_task_context()
+
+ sentry_sdk.capture_event(event, hint=hint)
+
+
+def _tag_task_context():
+ # type: () -> None
+ from pyspark.taskcontext import TaskContext
+
+ scope = sentry_sdk.get_isolation_scope()
+
+ @scope.add_event_processor
+ def process_event(event, hint):
+ # type: (Event, Hint) -> Optional[Event]
+ with capture_internal_exceptions():
+ integration = sentry_sdk.get_client().get_integration(
+ SparkWorkerIntegration
+ )
+ task_context = TaskContext.get()
+
+ if integration is None or task_context is None:
+ return event
+
+ event.setdefault("tags", {}).setdefault(
+ "stageId", str(task_context.stageId())
+ )
+ event["tags"].setdefault("partitionId", str(task_context.partitionId()))
+ event["tags"].setdefault("attemptNumber", str(task_context.attemptNumber()))
+ event["tags"].setdefault("taskAttemptId", str(task_context.taskAttemptId()))
+
+ if task_context._localProperties:
+ if "sentry_app_name" in task_context._localProperties:
+ event["tags"].setdefault(
+ "app_name", task_context._localProperties["sentry_app_name"]
+ )
+ event["tags"].setdefault(
+ "application_id",
+ task_context._localProperties["sentry_application_id"],
+ )
+
+ if "callSite.short" in task_context._localProperties:
+ event.setdefault("extra", {}).setdefault(
+ "callSite", task_context._localProperties["callSite.short"]
+ )
+
+ return event
+
+
+def _sentry_worker_main(*args, **kwargs):
+ # type: (*Optional[Any], **Optional[Any]) -> None
+ import pyspark.worker as original_worker
+
+ try:
+ original_worker.main(*args, **kwargs)
+ except SystemExit:
+ if sentry_sdk.get_client().get_integration(SparkWorkerIntegration) is not None:
+ exc_info = sys.exc_info()
+ with capture_internal_exceptions():
+ _capture_exception(exc_info)