aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py4
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py315
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py116
3 files changed, 435 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py
new file mode 100644
index 00000000..10d94163
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/__init__.py
@@ -0,0 +1,4 @@
+from sentry_sdk.integrations.spark.spark_driver import SparkIntegration
+from sentry_sdk.integrations.spark.spark_worker import SparkWorkerIntegration
+
+__all__ = ["SparkIntegration", "SparkWorkerIntegration"]
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
new file mode 100644
index 00000000..fac98535
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
@@ -0,0 +1,315 @@
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing import Any
+ from typing import Optional
+
+ from sentry_sdk._types import Event, Hint
+ from pyspark import SparkContext
+
+
+class SparkIntegration(Integration):
+ identifier = "spark"
+
+ @staticmethod
+ def setup_once():
+ # type: () -> None
+ _setup_sentry_tracing()
+
+
+def _set_app_properties():
+ # type: () -> None
+ """
+ Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
+ This allows worker integration to have access to app_name and application_id.
+ """
+ from pyspark import SparkContext
+
+ spark_context = SparkContext._active_spark_context
+ if spark_context:
+ spark_context.setLocalProperty(
+ "sentry_app_name",
+ spark_context.appName,
+ )
+ spark_context.setLocalProperty(
+ "sentry_application_id",
+ spark_context.applicationId,
+ )
+
+
+def _start_sentry_listener(sc):
+ # type: (SparkContext) -> None
+ """
+ Start java gateway server to add custom `SparkListener`
+ """
+ from pyspark.java_gateway import ensure_callback_server_started
+
+ gw = sc._gateway
+ ensure_callback_server_started(gw)
+ listener = SentryListener()
+ sc._jsc.sc().addSparkListener(listener)
+
+
+def _add_event_processor(sc):
+ # type: (SparkContext) -> None
+ scope = sentry_sdk.get_isolation_scope()
+
+ @scope.add_event_processor
+ def process_event(event, hint):
+ # type: (Event, Hint) -> Optional[Event]
+ with capture_internal_exceptions():
+ if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
+ return event
+
+ if sc._active_spark_context is None:
+ return event
+
+ event.setdefault("user", {}).setdefault("id", sc.sparkUser())
+
+ event.setdefault("tags", {}).setdefault(
+ "executor.id", sc._conf.get("spark.executor.id")
+ )
+ event["tags"].setdefault(
+ "spark-submit.deployMode",
+ sc._conf.get("spark.submit.deployMode"),
+ )
+ event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
+ event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
+ event["tags"].setdefault("spark_version", sc.version)
+ event["tags"].setdefault("app_name", sc.appName)
+ event["tags"].setdefault("application_id", sc.applicationId)
+ event["tags"].setdefault("master", sc.master)
+ event["tags"].setdefault("spark_home", sc.sparkHome)
+
+ event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)
+
+ return event
+
+
+def _activate_integration(sc):
+ # type: (SparkContext) -> None
+
+ _start_sentry_listener(sc)
+ _set_app_properties()
+ _add_event_processor(sc)
+
+
+def _patch_spark_context_init():
+ # type: () -> None
+ from pyspark import SparkContext
+
+ spark_context_init = SparkContext._do_init
+
+ @ensure_integration_enabled(SparkIntegration, spark_context_init)
+ def _sentry_patched_spark_context_init(self, *args, **kwargs):
+ # type: (SparkContext, *Any, **Any) -> Optional[Any]
+ rv = spark_context_init(self, *args, **kwargs)
+ _activate_integration(self)
+ return rv
+
+ SparkContext._do_init = _sentry_patched_spark_context_init
+
+
+def _setup_sentry_tracing():
+ # type: () -> None
+ from pyspark import SparkContext
+
+ if SparkContext._active_spark_context is not None:
+ _activate_integration(SparkContext._active_spark_context)
+ return
+ _patch_spark_context_init()
+
+
+class SparkListener:
+ def onApplicationEnd(self, applicationEnd): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onApplicationStart(self, applicationStart): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onBlockManagerAdded(self, blockManagerAdded): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onBlockManagerRemoved(self, blockManagerRemoved): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onBlockUpdated(self, blockUpdated): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onEnvironmentUpdate(self, environmentUpdate): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onExecutorAdded(self, executorAdded): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onExecutorBlacklisted(self, executorBlacklisted): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onExecutorBlacklistedForStage( # noqa: N802
+ self, executorBlacklistedForStage # noqa: N803
+ ):
+ # type: (Any) -> None
+ pass
+
+ def onExecutorMetricsUpdate(self, executorMetricsUpdate): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onExecutorRemoved(self, executorRemoved): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onJobEnd(self, jobEnd): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onJobStart(self, jobStart): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onNodeBlacklisted(self, nodeBlacklisted): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onNodeBlacklistedForStage(self, nodeBlacklistedForStage): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onNodeUnblacklisted(self, nodeUnblacklisted): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onOtherEvent(self, event): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onSpeculativeTaskSubmitted(self, speculativeTask): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onTaskEnd(self, taskEnd): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onTaskGettingResult(self, taskGettingResult): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onTaskStart(self, taskStart): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ def onUnpersistRDD(self, unpersistRDD): # noqa: N802,N803
+ # type: (Any) -> None
+ pass
+
+ class Java:
+ implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
+
+
+class SentryListener(SparkListener):
+ def _add_breadcrumb(
+ self,
+ level, # type: str
+ message, # type: str
+ data=None, # type: Optional[dict[str, Any]]
+ ):
+ # type: (...) -> None
+ sentry_sdk.get_isolation_scope().add_breadcrumb(
+ level=level, message=message, data=data
+ )
+
+ def onJobStart(self, jobStart): # noqa: N802,N803
+ # type: (Any) -> None
+ sentry_sdk.get_isolation_scope().clear_breadcrumbs()
+
+ message = "Job {} Started".format(jobStart.jobId())
+ self._add_breadcrumb(level="info", message=message)
+ _set_app_properties()
+
+ def onJobEnd(self, jobEnd): # noqa: N802,N803
+ # type: (Any) -> None
+ level = ""
+ message = ""
+ data = {"result": jobEnd.jobResult().toString()}
+
+ if jobEnd.jobResult().toString() == "JobSucceeded":
+ level = "info"
+ message = "Job {} Ended".format(jobEnd.jobId())
+ else:
+ level = "warning"
+ message = "Job {} Failed".format(jobEnd.jobId())
+
+ self._add_breadcrumb(level=level, message=message, data=data)
+
+ def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
+ # type: (Any) -> None
+ stage_info = stageSubmitted.stageInfo()
+ message = "Stage {} Submitted".format(stage_info.stageId())
+
+ data = {"name": stage_info.name()}
+ attempt_id = _get_attempt_id(stage_info)
+ if attempt_id is not None:
+ data["attemptId"] = attempt_id
+
+ self._add_breadcrumb(level="info", message=message, data=data)
+ _set_app_properties()
+
+ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
+ # type: (Any) -> None
+ from py4j.protocol import Py4JJavaError # type: ignore
+
+ stage_info = stageCompleted.stageInfo()
+ message = ""
+ level = ""
+
+ data = {"name": stage_info.name()}
+ attempt_id = _get_attempt_id(stage_info)
+ if attempt_id is not None:
+ data["attemptId"] = attempt_id
+
+ # Have to Try Except because stageInfo.failureReason() is typed with Scala Option
+ try:
+ data["reason"] = stage_info.failureReason().get()
+ message = "Stage {} Failed".format(stage_info.stageId())
+ level = "warning"
+ except Py4JJavaError:
+ message = "Stage {} Completed".format(stage_info.stageId())
+ level = "info"
+
+ self._add_breadcrumb(level=level, message=message, data=data)
+
+
+def _get_attempt_id(stage_info):
+ # type: (Any) -> Optional[int]
+ try:
+ return stage_info.attemptId()
+ except Exception:
+ pass
+
+ try:
+ return stage_info.attemptNumber()
+ except Exception:
+ pass
+
+ return None
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
new file mode 100644
index 00000000..5340a0b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_worker.py
@@ -0,0 +1,116 @@
+import sys
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.utils import (
+ capture_internal_exceptions,
+ exc_info_from_error,
+ single_exception_from_error_tuple,
+ walk_exception_chain,
+ event_hint_with_exc_info,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing import Any
+ from typing import Optional
+
+ from sentry_sdk._types import ExcInfo, Event, Hint
+
+
+class SparkWorkerIntegration(Integration):
+ identifier = "spark_worker"
+
+ @staticmethod
+ def setup_once():
+ # type: () -> None
+ import pyspark.daemon as original_daemon
+
+ original_daemon.worker_main = _sentry_worker_main
+
+
+def _capture_exception(exc_info):
+ # type: (ExcInfo) -> None
+ client = sentry_sdk.get_client()
+
+ mechanism = {"type": "spark", "handled": False}
+
+ exc_info = exc_info_from_error(exc_info)
+
+ exc_type, exc_value, tb = exc_info
+ rv = []
+
+ # On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
+ for exc_type, exc_value, tb in walk_exception_chain(exc_info):
+ if exc_type not in (SystemExit, EOFError, ConnectionResetError):
+ rv.append(
+ single_exception_from_error_tuple(
+ exc_type, exc_value, tb, client.options, mechanism
+ )
+ )
+
+ if rv:
+ rv.reverse()
+ hint = event_hint_with_exc_info(exc_info)
+ event = {"level": "error", "exception": {"values": rv}} # type: Event
+
+ _tag_task_context()
+
+ sentry_sdk.capture_event(event, hint=hint)
+
+
+def _tag_task_context():
+ # type: () -> None
+ from pyspark.taskcontext import TaskContext
+
+ scope = sentry_sdk.get_isolation_scope()
+
+ @scope.add_event_processor
+ def process_event(event, hint):
+ # type: (Event, Hint) -> Optional[Event]
+ with capture_internal_exceptions():
+ integration = sentry_sdk.get_client().get_integration(
+ SparkWorkerIntegration
+ )
+ task_context = TaskContext.get()
+
+ if integration is None or task_context is None:
+ return event
+
+ event.setdefault("tags", {}).setdefault(
+ "stageId", str(task_context.stageId())
+ )
+ event["tags"].setdefault("partitionId", str(task_context.partitionId()))
+ event["tags"].setdefault("attemptNumber", str(task_context.attemptNumber()))
+ event["tags"].setdefault("taskAttemptId", str(task_context.taskAttemptId()))
+
+ if task_context._localProperties:
+ if "sentry_app_name" in task_context._localProperties:
+ event["tags"].setdefault(
+ "app_name", task_context._localProperties["sentry_app_name"]
+ )
+ event["tags"].setdefault(
+ "application_id",
+ task_context._localProperties["sentry_application_id"],
+ )
+
+ if "callSite.short" in task_context._localProperties:
+ event.setdefault("extra", {}).setdefault(
+ "callSite", task_context._localProperties["callSite.short"]
+ )
+
+ return event
+
+
+def _sentry_worker_main(*args, **kwargs):
+ # type: (*Optional[Any], **Optional[Any]) -> None
+ import pyspark.worker as original_worker
+
+ try:
+ original_worker.main(*args, **kwargs)
+ except SystemExit:
+ if sentry_sdk.get_client().get_integration(SparkWorkerIntegration) is not None:
+ exc_info = sys.exc_info()
+ with capture_internal_exceptions():
+ _capture_exception(exc_info)