about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py315
1 files changed, 315 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
new file mode 100644
index 00000000..fac98535
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
@@ -0,0 +1,315 @@
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Optional
+
+    from sentry_sdk._types import Event, Hint
+    from pyspark import SparkContext
+
+
+class SparkIntegration(Integration):
+    identifier = "spark"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        _setup_sentry_tracing()
+
+
+def _set_app_properties():
+    # type: () -> None
+    """
+    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
+    This allows worker integration to have access to app_name and application_id.
+    """
+    from pyspark import SparkContext
+
+    spark_context = SparkContext._active_spark_context
+    if spark_context:
+        spark_context.setLocalProperty(
+            "sentry_app_name",
+            spark_context.appName,
+        )
+        spark_context.setLocalProperty(
+            "sentry_application_id",
+            spark_context.applicationId,
+        )
+
+
+def _start_sentry_listener(sc):
+    # type: (SparkContext) -> None
+    """
+    Start java gateway server to add custom `SparkListener`
+    """
+    from pyspark.java_gateway import ensure_callback_server_started
+
+    gw = sc._gateway
+    ensure_callback_server_started(gw)
+    listener = SentryListener()
+    sc._jsc.sc().addSparkListener(listener)
+
+
+def _add_event_processor(sc):
+    # type: (SparkContext) -> None
+    scope = sentry_sdk.get_isolation_scope()
+
+    @scope.add_event_processor
+    def process_event(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+        with capture_internal_exceptions():
+            if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
+                return event
+
+            if sc._active_spark_context is None:
+                return event
+
+            event.setdefault("user", {}).setdefault("id", sc.sparkUser())
+
+            event.setdefault("tags", {}).setdefault(
+                "executor.id", sc._conf.get("spark.executor.id")
+            )
+            event["tags"].setdefault(
+                "spark-submit.deployMode",
+                sc._conf.get("spark.submit.deployMode"),
+            )
+            event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
+            event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
+            event["tags"].setdefault("spark_version", sc.version)
+            event["tags"].setdefault("app_name", sc.appName)
+            event["tags"].setdefault("application_id", sc.applicationId)
+            event["tags"].setdefault("master", sc.master)
+            event["tags"].setdefault("spark_home", sc.sparkHome)
+
+            event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)
+
+        return event
+
+
+def _activate_integration(sc):
+    # type: (SparkContext) -> None
+
+    _start_sentry_listener(sc)
+    _set_app_properties()
+    _add_event_processor(sc)
+
+
+def _patch_spark_context_init():
+    # type: () -> None
+    from pyspark import SparkContext
+
+    spark_context_init = SparkContext._do_init
+
+    @ensure_integration_enabled(SparkIntegration, spark_context_init)
+    def _sentry_patched_spark_context_init(self, *args, **kwargs):
+        # type: (SparkContext, *Any, **Any) -> Optional[Any]
+        rv = spark_context_init(self, *args, **kwargs)
+        _activate_integration(self)
+        return rv
+
+    SparkContext._do_init = _sentry_patched_spark_context_init
+
+
+def _setup_sentry_tracing():
+    # type: () -> None
+    from pyspark import SparkContext
+
+    if SparkContext._active_spark_context is not None:
+        _activate_integration(SparkContext._active_spark_context)
+        return
+    _patch_spark_context_init()
+
+
+class SparkListener:
+    def onApplicationEnd(self, applicationEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onApplicationStart(self, applicationStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onBlockManagerAdded(self, blockManagerAdded):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onBlockManagerRemoved(self, blockManagerRemoved):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onBlockUpdated(self, blockUpdated):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onEnvironmentUpdate(self, environmentUpdate):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorAdded(self, executorAdded):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorBlacklisted(self, executorBlacklisted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorBlacklistedForStage(  # noqa: N802
+        self, executorBlacklistedForStage  # noqa: N803
+    ):
+        # type: (Any) -> None
+        pass
+
+    def onExecutorMetricsUpdate(self, executorMetricsUpdate):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorRemoved(self, executorRemoved):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onJobEnd(self, jobEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onJobStart(self, jobStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onNodeBlacklisted(self, nodeBlacklisted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onNodeBlacklistedForStage(self, nodeBlacklistedForStage):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onNodeUnblacklisted(self, nodeUnblacklisted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onOtherEvent(self, event):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onSpeculativeTaskSubmitted(self, speculativeTask):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onTaskEnd(self, taskEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onTaskGettingResult(self, taskGettingResult):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onTaskStart(self, taskStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onUnpersistRDD(self, unpersistRDD):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    class Java:
+        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
+
+
+class SentryListener(SparkListener):
+    def _add_breadcrumb(
+        self,
+        level,  # type: str
+        message,  # type: str
+        data=None,  # type: Optional[dict[str, Any]]
+    ):
+        # type: (...) -> None
+        sentry_sdk.get_isolation_scope().add_breadcrumb(
+            level=level, message=message, data=data
+        )
+
+    def onJobStart(self, jobStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        sentry_sdk.get_isolation_scope().clear_breadcrumbs()
+
+        message = "Job {} Started".format(jobStart.jobId())
+        self._add_breadcrumb(level="info", message=message)
+        _set_app_properties()
+
+    def onJobEnd(self, jobEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        level = ""
+        message = ""
+        data = {"result": jobEnd.jobResult().toString()}
+
+        if jobEnd.jobResult().toString() == "JobSucceeded":
+            level = "info"
+            message = "Job {} Ended".format(jobEnd.jobId())
+        else:
+            level = "warning"
+            message = "Job {} Failed".format(jobEnd.jobId())
+
+        self._add_breadcrumb(level=level, message=message, data=data)
+
+    def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
+        # type: (Any) -> None
+        stage_info = stageSubmitted.stageInfo()
+        message = "Stage {} Submitted".format(stage_info.stageId())
+
+        data = {"name": stage_info.name()}
+        attempt_id = _get_attempt_id(stage_info)
+        if attempt_id is not None:
+            data["attemptId"] = attempt_id
+
+        self._add_breadcrumb(level="info", message=message, data=data)
+        _set_app_properties()
+
+    def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
+        # type: (Any) -> None
+        from py4j.protocol import Py4JJavaError  # type: ignore
+
+        stage_info = stageCompleted.stageInfo()
+        message = ""
+        level = ""
+
+        data = {"name": stage_info.name()}
+        attempt_id = _get_attempt_id(stage_info)
+        if attempt_id is not None:
+            data["attemptId"] = attempt_id
+
+        # Have to Try Except because stageInfo.failureReason() is typed with Scala Option
+        try:
+            data["reason"] = stage_info.failureReason().get()
+            message = "Stage {} Failed".format(stage_info.stageId())
+            level = "warning"
+        except Py4JJavaError:
+            message = "Stage {} Completed".format(stage_info.stageId())
+            level = "info"
+
+        self._add_breadcrumb(level=level, message=message, data=data)
+
+
+def _get_attempt_id(stage_info):
+    # type: (Any) -> Optional[int]
+    try:
+        return stage_info.attemptId()
+    except Exception:
+        pass
+
+    try:
+        return stage_info.attemptNumber()
+    except Exception:
+        pass
+
+    return None