about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py176
1 files changed, 176 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py
new file mode 100644
index 00000000..a2e4553f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/beam.py
@@ -0,0 +1,176 @@
+import sys
+import types
+from functools import wraps
+
+import sentry_sdk
+from sentry_sdk.integrations import Integration
+from sentry_sdk.integrations.logging import ignore_logger
+from sentry_sdk.utils import (
+    capture_internal_exceptions,
+    ensure_integration_enabled,
+    event_from_exception,
+    reraise,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Iterator
+    from typing import TypeVar
+    from typing import Callable
+
+    from sentry_sdk._types import ExcInfo
+
+    T = TypeVar("T")
+    F = TypeVar("F", bound=Callable[..., Any])
+
+
+WRAPPED_FUNC = "_wrapped_{}_"
+INSPECT_FUNC = "_inspect_{}"  # Required format per apache_beam/transforms/core.py
+USED_FUNC = "_sentry_used_"
+
+
+class BeamIntegration(Integration):
+    identifier = "beam"
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        from apache_beam.transforms.core import DoFn, ParDo  # type: ignore
+
+        ignore_logger("root")
+        ignore_logger("bundle_processor.create")
+
+        function_patches = ["process", "start_bundle", "finish_bundle", "setup"]
+        for func_name in function_patches:
+            setattr(
+                DoFn,
+                INSPECT_FUNC.format(func_name),
+                _wrap_inspect_call(DoFn, func_name),
+            )
+
+        old_init = ParDo.__init__
+
+        def sentry_init_pardo(self, fn, *args, **kwargs):
+            # type: (ParDo, Any, *Any, **Any) -> Any
+            # Do not monkey patch init twice
+            if not getattr(self, "_sentry_is_patched", False):
+                for func_name in function_patches:
+                    if not hasattr(fn, func_name):
+                        continue
+                    wrapped_func = WRAPPED_FUNC.format(func_name)
+
+                    # Check to see if inspect is set and process is not
+                    # to avoid monkey patching process twice.
+                    # Check to see if function is part of object for
+                    # backwards compatibility.
+                    process_func = getattr(fn, func_name)
+                    inspect_func = getattr(fn, INSPECT_FUNC.format(func_name))
+                    if not getattr(inspect_func, USED_FUNC, False) and not getattr(
+                        process_func, USED_FUNC, False
+                    ):
+                        setattr(fn, wrapped_func, process_func)
+                        setattr(fn, func_name, _wrap_task_call(process_func))
+
+                self._sentry_is_patched = True
+            old_init(self, fn, *args, **kwargs)
+
+        ParDo.__init__ = sentry_init_pardo
+
+
+def _wrap_inspect_call(cls, func_name):
+    # type: (Any, Any) -> Any
+
+    if not hasattr(cls, func_name):
+        return None
+
+    def _inspect(self):
+        # type: (Any) -> Any
+        """
+        Inspect function overrides the way Beam gets argspec.
+        """
+        wrapped_func = WRAPPED_FUNC.format(func_name)
+        if hasattr(self, wrapped_func):
+            process_func = getattr(self, wrapped_func)
+        else:
+            process_func = getattr(self, func_name)
+            setattr(self, func_name, _wrap_task_call(process_func))
+            setattr(self, wrapped_func, process_func)
+
+        # getfullargspec is deprecated in more recent beam versions and get_function_args_defaults
+        # (which uses Signatures internally) should be used instead.
+        try:
+            from apache_beam.transforms.core import get_function_args_defaults
+
+            return get_function_args_defaults(process_func)
+        except ImportError:
+            from apache_beam.typehints.decorators import getfullargspec  # type: ignore
+
+            return getfullargspec(process_func)
+
+    setattr(_inspect, USED_FUNC, True)
+    return _inspect
+
+
+def _wrap_task_call(func):
+    # type: (F) -> F
+    """
+    Wrap task call with a try catch to get exceptions.
+    """
+
+    @wraps(func)
+    def _inner(*args, **kwargs):
+        # type: (*Any, **Any) -> Any
+        try:
+            gen = func(*args, **kwargs)
+        except Exception:
+            raise_exception()
+
+        if not isinstance(gen, types.GeneratorType):
+            return gen
+        return _wrap_generator_call(gen)
+
+    setattr(_inner, USED_FUNC, True)
+    return _inner  # type: ignore
+
+
+@ensure_integration_enabled(BeamIntegration)
+def _capture_exception(exc_info):
+    # type: (ExcInfo) -> None
+    """
+    Send Beam exception to Sentry.
+    """
+    client = sentry_sdk.get_client()
+
+    event, hint = event_from_exception(
+        exc_info,
+        client_options=client.options,
+        mechanism={"type": "beam", "handled": False},
+    )
+    sentry_sdk.capture_event(event, hint=hint)
+
+
+def raise_exception():
+    # type: () -> None
+    """
+    Raise an exception.
+    """
+    exc_info = sys.exc_info()
+    with capture_internal_exceptions():
+        _capture_exception(exc_info)
+    reraise(*exc_info)
+
+
+def _wrap_generator_call(gen):
+    # type: (Iterator[T]) -> Iterator[T]
+    """
+    Wrap the generator to handle any failures.
+    """
+    while True:
+        try:
+            yield next(gen)
+        except StopIteration:
+            break
+        except Exception:
+            raise_exception()