about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py4
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py315
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py116
3 files changed, 435 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py
new file mode 100644
index 00000000..10d94163
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py
@@ -0,0 +1,4 @@
+from sentry_sdk.integrations.spark.spark_driver import SparkIntegration
+from sentry_sdk.integrations.spark.spark_worker import SparkWorkerIntegration
+
+__all__ = ["SparkIntegration", "SparkWorkerIntegration"]
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
new file mode 100644
index 00000000..fac98535
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
@@ -0,0 +1,315 @@
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Optional
+
+    from sentry_sdk._types import Event, Hint
+    from pyspark import SparkContext
+
+
+class SparkIntegration(Integration):
+    identifier = "spark"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        _setup_sentry_tracing()
+
+
+def _set_app_properties():
+    # type: () -> None
+    """
+    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
+    This allows worker integration to have access to app_name and application_id.
+    """
+    from pyspark import SparkContext
+
+    spark_context = SparkContext._active_spark_context
+    if spark_context:
+        spark_context.setLocalProperty(
+            "sentry_app_name",
+            spark_context.appName,
+        )
+        spark_context.setLocalProperty(
+            "sentry_application_id",
+            spark_context.applicationId,
+        )
+
+
+def _start_sentry_listener(sc):
+    # type: (SparkContext) -> None
+    """
+    Start java gateway server to add custom `SparkListener`
+    """
+    from pyspark.java_gateway import ensure_callback_server_started
+
+    gw = sc._gateway
+    ensure_callback_server_started(gw)
+    listener = SentryListener()
+    sc._jsc.sc().addSparkListener(listener)
+
+
+def _add_event_processor(sc):
+    # type: (SparkContext) -> None
+    scope = sentry_sdk.get_isolation_scope()
+
+    @scope.add_event_processor
+    def process_event(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+        with capture_internal_exceptions():
+            if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
+                return event
+
+            if sc._active_spark_context is None:
+                return event
+
+            event.setdefault("user", {}).setdefault("id", sc.sparkUser())
+
+            event.setdefault("tags", {}).setdefault(
+                "executor.id", sc._conf.get("spark.executor.id")
+            )
+            event["tags"].setdefault(
+                "spark-submit.deployMode",
+                sc._conf.get("spark.submit.deployMode"),
+            )
+            event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
+            event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
+            event["tags"].setdefault("spark_version", sc.version)
+            event["tags"].setdefault("app_name", sc.appName)
+            event["tags"].setdefault("application_id", sc.applicationId)
+            event["tags"].setdefault("master", sc.master)
+            event["tags"].setdefault("spark_home", sc.sparkHome)
+
+            event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)
+
+        return event
+
+
+def _activate_integration(sc):
+    # type: (SparkContext) -> None
+
+    _start_sentry_listener(sc)
+    _set_app_properties()
+    _add_event_processor(sc)
+
+
+def _patch_spark_context_init():
+    # type: () -> None
+    from pyspark import SparkContext
+
+    spark_context_init = SparkContext._do_init
+
+    @ensure_integration_enabled(SparkIntegration, spark_context_init)
+    def _sentry_patched_spark_context_init(self, *args, **kwargs):
+        # type: (SparkContext, *Any, **Any) -> Optional[Any]
+        rv = spark_context_init(self, *args, **kwargs)
+        _activate_integration(self)
+        return rv
+
+    SparkContext._do_init = _sentry_patched_spark_context_init
+
+
+def _setup_sentry_tracing():
+    # type: () -> None
+    from pyspark import SparkContext
+
+    if SparkContext._active_spark_context is not None:
+        _activate_integration(SparkContext._active_spark_context)
+        return
+    _patch_spark_context_init()
+
+
+class SparkListener:
+    def onApplicationEnd(self, applicationEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onApplicationStart(self, applicationStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onBlockManagerAdded(self, blockManagerAdded):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onBlockManagerRemoved(self, blockManagerRemoved):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onBlockUpdated(self, blockUpdated):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onEnvironmentUpdate(self, environmentUpdate):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorAdded(self, executorAdded):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorBlacklisted(self, executorBlacklisted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorBlacklistedForStage(  # noqa: N802
+        self, executorBlacklistedForStage  # noqa: N803
+    ):
+        # type: (Any) -> None
+        pass
+
+    def onExecutorMetricsUpdate(self, executorMetricsUpdate):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onExecutorRemoved(self, executorRemoved):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onJobEnd(self, jobEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onJobStart(self, jobStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onNodeBlacklisted(self, nodeBlacklisted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onNodeBlacklistedForStage(self, nodeBlacklistedForStage):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onNodeUnblacklisted(self, nodeUnblacklisted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onOtherEvent(self, event):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onSpeculativeTaskSubmitted(self, speculativeTask):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onTaskEnd(self, taskEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onTaskGettingResult(self, taskGettingResult):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onTaskStart(self, taskStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    def onUnpersistRDD(self, unpersistRDD):  # noqa: N802,N803
+        # type: (Any) -> None
+        pass
+
+    class Java:
+        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
+
+
+class SentryListener(SparkListener):
+    def _add_breadcrumb(
+        self,
+        level,  # type: str
+        message,  # type: str
+        data=None,  # type: Optional[dict[str, Any]]
+    ):
+        # type: (...) -> None
+        sentry_sdk.get_isolation_scope().add_breadcrumb(
+            level=level, message=message, data=data
+        )
+
+    def onJobStart(self, jobStart):  # noqa: N802,N803
+        # type: (Any) -> None
+        sentry_sdk.get_isolation_scope().clear_breadcrumbs()
+
+        message = "Job {} Started".format(jobStart.jobId())
+        self._add_breadcrumb(level="info", message=message)
+        _set_app_properties()
+
+    def onJobEnd(self, jobEnd):  # noqa: N802,N803
+        # type: (Any) -> None
+        level = ""
+        message = ""
+        data = {"result": jobEnd.jobResult().toString()}
+
+        if jobEnd.jobResult().toString() == "JobSucceeded":
+            level = "info"
+            message = "Job {} Ended".format(jobEnd.jobId())
+        else:
+            level = "warning"
+            message = "Job {} Failed".format(jobEnd.jobId())
+
+        self._add_breadcrumb(level=level, message=message, data=data)
+
+    def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
+        # type: (Any) -> None
+        stage_info = stageSubmitted.stageInfo()
+        message = "Stage {} Submitted".format(stage_info.stageId())
+
+        data = {"name": stage_info.name()}
+        attempt_id = _get_attempt_id(stage_info)
+        if attempt_id is not None:
+            data["attemptId"] = attempt_id
+
+        self._add_breadcrumb(level="info", message=message, data=data)
+        _set_app_properties()
+
+    def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
+        # type: (Any) -> None
+        from py4j.protocol import Py4JJavaError  # type: ignore
+
+        stage_info = stageCompleted.stageInfo()
+        message = ""
+        level = ""
+
+        data = {"name": stage_info.name()}
+        attempt_id = _get_attempt_id(stage_info)
+        if attempt_id is not None:
+            data["attemptId"] = attempt_id
+
+        # Have to Try Except because stageInfo.failureReason() is typed with Scala Option
+        try:
+            data["reason"] = stage_info.failureReason().get()
+            message = "Stage {} Failed".format(stage_info.stageId())
+            level = "warning"
+        except Py4JJavaError:
+            message = "Stage {} Completed".format(stage_info.stageId())
+            level = "info"
+
+        self._add_breadcrumb(level=level, message=message, data=data)
+
+
+def _get_attempt_id(stage_info):
+    # type: (Any) -> Optional[int]
+    try:
+        return stage_info.attemptId()
+    except Exception:
+        pass
+
+    try:
+        return stage_info.attemptNumber()
+    except Exception:
+        pass
+
+    return None
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
new file mode 100644
index 00000000..5340a0b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
@@ -0,0 +1,116 @@
+import sys
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import (
+    capture_internal_exceptions,
+    exc_info_from_error,
+    single_exception_from_error_tuple,
+    walk_exception_chain,
+    event_hint_with_exc_info,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Optional
+
+    from sentry_sdk._types import ExcInfo, Event, Hint
+
+
+class SparkWorkerIntegration(Integration):
+    identifier = "spark_worker"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        import pyspark.daemon as original_daemon
+
+        original_daemon.worker_main = _sentry_worker_main
+
+
+def _capture_exception(exc_info):
+    # type: (ExcInfo) -> None
+    client = sentry_sdk.get_client()
+
+    mechanism = {"type": "spark", "handled": False}
+
+    exc_info = exc_info_from_error(exc_info)
+
+    exc_type, exc_value, tb = exc_info
+    rv = []
+
+    # On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
+    for exc_type, exc_value, tb in walk_exception_chain(exc_info):
+        if exc_type not in (SystemExit, EOFError, ConnectionResetError):
+            rv.append(
+                single_exception_from_error_tuple(
+                    exc_type, exc_value, tb, client.options, mechanism
+                )
+            )
+
+    if rv:
+        rv.reverse()
+        hint = event_hint_with_exc_info(exc_info)
+        event = {"level": "error", "exception": {"values": rv}}  # type: Event
+
+        _tag_task_context()
+
+        sentry_sdk.capture_event(event, hint=hint)
+
+
+def _tag_task_context():
+    # type: () -> None
+    from pyspark.taskcontext import TaskContext
+
+    scope = sentry_sdk.get_isolation_scope()
+
+    @scope.add_event_processor
+    def process_event(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+        with capture_internal_exceptions():
+            integration = sentry_sdk.get_client().get_integration(
+                SparkWorkerIntegration
+            )
+            task_context = TaskContext.get()
+
+            if integration is None or task_context is None:
+                return event
+
+            event.setdefault("tags", {}).setdefault(
+                "stageId", str(task_context.stageId())
+            )
+            event["tags"].setdefault("partitionId", str(task_context.partitionId()))
+            event["tags"].setdefault("attemptNumber", str(task_context.attemptNumber()))
+            event["tags"].setdefault("taskAttemptId", str(task_context.taskAttemptId()))
+
+            if task_context._localProperties:
+                if "sentry_app_name" in task_context._localProperties:
+                    event["tags"].setdefault(
+                        "app_name", task_context._localProperties["sentry_app_name"]
+                    )
+                    event["tags"].setdefault(
+                        "application_id",
+                        task_context._localProperties["sentry_application_id"],
+                    )
+
+                if "callSite.short" in task_context._localProperties:
+                    event.setdefault("extra", {}).setdefault(
+                        "callSite", task_context._localProperties["callSite.short"]
+                    )
+
+        return event
+
+
+def _sentry_worker_main(*args, **kwargs):
+    # type: (*Optional[Any], **Optional[Any]) -> None
+    import pyspark.worker as original_worker
+
+    try:
+        original_worker.main(*args, **kwargs)
+    except SystemExit:
+        if sentry_sdk.get_client().get_integration(SparkWorkerIntegration) is not None:
+            exc_info = sys.exc_info()
+            with capture_internal_exceptions():
+                _capture_exception(exc_info)