diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py | 315 |
1 files changed, 315 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py new file mode 100644 index 00000000..fac98535 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py @@ -0,0 +1,315 @@ +import sentry_sdk +from sentry_sdk.integrations import Integration +from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any + from typing import Optional + + from sentry_sdk._types import Event, Hint + from pyspark import SparkContext + + +class SparkIntegration(Integration): + identifier = "spark" + + @staticmethod + def setup_once(): + # type: () -> None + _setup_sentry_tracing() + + +def _set_app_properties(): + # type: () -> None + """ + Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties. + This allows worker integration to have access to app_name and application_id. + """ + from pyspark import SparkContext + + spark_context = SparkContext._active_spark_context + if spark_context: + spark_context.setLocalProperty( + "sentry_app_name", + spark_context.appName, + ) + spark_context.setLocalProperty( + "sentry_application_id", + spark_context.applicationId, + ) + + +def _start_sentry_listener(sc): + # type: (SparkContext) -> None + """ + Start java gateway server to add custom `SparkListener` + """ + from pyspark.java_gateway import ensure_callback_server_started + + gw = sc._gateway + ensure_callback_server_started(gw) + listener = SentryListener() + sc._jsc.sc().addSparkListener(listener) + + +def _add_event_processor(sc): + # type: (SparkContext) -> None + scope = sentry_sdk.get_isolation_scope() + + @scope.add_event_processor + def process_event(event, hint): + # type: (Event, Hint) -> Optional[Event] + with capture_internal_exceptions(): + if sentry_sdk.get_client().get_integration(SparkIntegration) is None: + return event + + if sc._active_spark_context is None: + return event + + event.setdefault("user", {}).setdefault("id", sc.sparkUser()) + + event.setdefault("tags", {}).setdefault( + "executor.id", sc._conf.get("spark.executor.id") + ) + event["tags"].setdefault( + "spark-submit.deployMode", + sc._conf.get("spark.submit.deployMode"), + ) + event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host")) + event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port")) + event["tags"].setdefault("spark_version", sc.version) + event["tags"].setdefault("app_name", sc.appName) + event["tags"].setdefault("application_id", sc.applicationId) + event["tags"].setdefault("master", sc.master) + event["tags"].setdefault("spark_home", sc.sparkHome) + + event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl) + + return event + + +def _activate_integration(sc): + # type: (SparkContext) -> None + + _start_sentry_listener(sc) + _set_app_properties() + _add_event_processor(sc) + + +def _patch_spark_context_init(): + # type: () -> None + from pyspark import SparkContext + + spark_context_init = SparkContext._do_init + + @ensure_integration_enabled(SparkIntegration, spark_context_init) + def _sentry_patched_spark_context_init(self, *args, **kwargs): + # type: (SparkContext, *Any, **Any) -> Optional[Any] + rv = spark_context_init(self, *args, **kwargs) + _activate_integration(self) + return rv + + SparkContext._do_init = _sentry_patched_spark_context_init + + +def _setup_sentry_tracing(): + # type: () -> None + from pyspark import SparkContext + + if SparkContext._active_spark_context is not None: + _activate_integration(SparkContext._active_spark_context) + return + _patch_spark_context_init() + + +class SparkListener: + def onApplicationEnd(self, applicationEnd): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onApplicationStart(self, applicationStart): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onBlockManagerAdded(self, blockManagerAdded): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onBlockManagerRemoved(self, blockManagerRemoved): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onBlockUpdated(self, blockUpdated): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onEnvironmentUpdate(self, environmentUpdate): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onExecutorAdded(self, executorAdded): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onExecutorBlacklisted(self, executorBlacklisted): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onExecutorBlacklistedForStage( # noqa: N802 + self, executorBlacklistedForStage # noqa: N803 + ): + # type: (Any) -> None + pass + + def onExecutorMetricsUpdate(self, executorMetricsUpdate): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onExecutorRemoved(self, executorRemoved): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onJobEnd(self, jobEnd): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onJobStart(self, jobStart): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onNodeBlacklisted(self, nodeBlacklisted): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onNodeBlacklistedForStage(self, nodeBlacklistedForStage): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onNodeUnblacklisted(self, nodeUnblacklisted): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onOtherEvent(self, event): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onSpeculativeTaskSubmitted(self, speculativeTask): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onStageCompleted(self, stageCompleted): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onTaskEnd(self, taskEnd): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onTaskGettingResult(self, taskGettingResult): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onTaskStart(self, taskStart): # noqa: N802,N803 + # type: (Any) -> None + pass + + def onUnpersistRDD(self, unpersistRDD): # noqa: N802,N803 + # type: (Any) -> None + pass + + class Java: + implements = ["org.apache.spark.scheduler.SparkListenerInterface"] + + +class SentryListener(SparkListener): + def _add_breadcrumb( + self, + level, # type: str + message, # type: str + data=None, # type: Optional[dict[str, Any]] + ): + # type: (...) -> None + sentry_sdk.get_isolation_scope().add_breadcrumb( + level=level, message=message, data=data + ) + + def onJobStart(self, jobStart): # noqa: N802,N803 + # type: (Any) -> None + sentry_sdk.get_isolation_scope().clear_breadcrumbs() + + message = "Job {} Started".format(jobStart.jobId()) + self._add_breadcrumb(level="info", message=message) + _set_app_properties() + + def onJobEnd(self, jobEnd): # noqa: N802,N803 + # type: (Any) -> None + level = "" + message = "" + data = {"result": jobEnd.jobResult().toString()} + + if jobEnd.jobResult().toString() == "JobSucceeded": + level = "info" + message = "Job {} Ended".format(jobEnd.jobId()) + else: + level = "warning" + message = "Job {} Failed".format(jobEnd.jobId()) + + self._add_breadcrumb(level=level, message=message, data=data) + + def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803 + # type: (Any) -> None + stage_info = stageSubmitted.stageInfo() + message = "Stage {} Submitted".format(stage_info.stageId()) + + data = {"name": stage_info.name()} + attempt_id = _get_attempt_id(stage_info) + if attempt_id is not None: + data["attemptId"] = attempt_id + + self._add_breadcrumb(level="info", message=message, data=data) + _set_app_properties() + + def onStageCompleted(self, stageCompleted): # noqa: N802,N803 + # type: (Any) -> None + from py4j.protocol import Py4JJavaError # type: ignore + + stage_info = stageCompleted.stageInfo() + message = "" + level = "" + + data = {"name": stage_info.name()} + attempt_id = _get_attempt_id(stage_info) + if attempt_id is not None: + data["attemptId"] = attempt_id + + # Have to Try Except because stageInfo.failureReason() is typed with Scala Option + try: + data["reason"] = stage_info.failureReason().get() + message = "Stage {} Failed".format(stage_info.stageId()) + level = "warning" + except Py4JJavaError: + message = "Stage {} Completed".format(stage_info.stageId()) + level = "info" + + self._add_breadcrumb(level=level, message=message, data=data) + + +def _get_attempt_id(stage_info): + # type: (Any) -> Optional[int] + try: + return stage_info.attemptId() + except Exception: + pass + + try: + return stage_info.attemptNumber() + except Exception: + pass + + return None |