aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/utils.py1907
1 files changed, 1907 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/utils.py b/.venv/lib/python3.12/site-packages/sentry_sdk/utils.py
new file mode 100644
index 00000000..89b2354c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/utils.py
@@ -0,0 +1,1907 @@
+import base64
+import json
+import linecache
+import logging
+import math
+import os
+import random
+import re
+import subprocess
+import sys
+import threading
+import time
+from collections import namedtuple
+from datetime import datetime, timezone
+from decimal import Decimal
+from functools import partial, partialmethod, wraps
+from numbers import Real
+from urllib.parse import parse_qs, unquote, urlencode, urlsplit, urlunsplit
+
+try:
+ # Python 3.11
+ from builtins import BaseExceptionGroup
+except ImportError:
+ # Python 3.10 and below
+ BaseExceptionGroup = None # type: ignore
+
+import sentry_sdk
+from sentry_sdk._compat import PY37
+from sentry_sdk.consts import (
+ DEFAULT_ADD_FULL_STACK,
+ DEFAULT_MAX_STACK_FRAMES,
+ DEFAULT_MAX_VALUE_LENGTH,
+ EndpointType,
+)
+from sentry_sdk._types import Annotated, AnnotatedValue, SENSITIVE_DATA_SUBSTITUTE
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from types import FrameType, TracebackType
+ from typing import (
+ Any,
+ Callable,
+ cast,
+ ContextManager,
+ Dict,
+ Iterator,
+ List,
+ NoReturn,
+ Optional,
+ overload,
+ ParamSpec,
+ Set,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+ )
+
+ from gevent.hub import Hub
+
+ from sentry_sdk._types import Event, ExcInfo
+
+ P = ParamSpec("P")
+ R = TypeVar("R")
+
+
+epoch = datetime(1970, 1, 1)
+
+# The logger is created here but initialized in the debug support module
+logger = logging.getLogger("sentry_sdk.errors")
+
+_installed_modules = None
+
+BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$")
+
+FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0"))
+TRUTHY_ENV_VALUES = frozenset(("true", "t", "y", "yes", "on", "1"))
+
+
+def env_to_bool(value, *, strict=False):
+ # type: (Any, Optional[bool]) -> bool | None
+ """Casts an ENV variable value to boolean using the constants defined above.
+ In strict mode, it may return None if the value doesn't match any of the predefined values.
+ """
+ normalized = str(value).lower() if value is not None else None
+
+ if normalized in FALSY_ENV_VALUES:
+ return False
+
+ if normalized in TRUTHY_ENV_VALUES:
+ return True
+
+ return None if strict else bool(value)
+
+
+def json_dumps(data):
+ # type: (Any) -> bytes
+ """Serialize data into a compact JSON representation encoded as UTF-8."""
+ return json.dumps(data, allow_nan=False, separators=(",", ":")).encode("utf-8")
+
+
+def get_git_revision():
+ # type: () -> Optional[str]
+ try:
+ with open(os.path.devnull, "w+") as null:
+ # prevent command prompt windows from popping up on windows
+ startupinfo = None
+ if sys.platform == "win32" or sys.platform == "cygwin":
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+
+ revision = (
+ subprocess.Popen(
+ ["git", "rev-parse", "HEAD"],
+ startupinfo=startupinfo,
+ stdout=subprocess.PIPE,
+ stderr=null,
+ stdin=null,
+ )
+ .communicate()[0]
+ .strip()
+ .decode("utf-8")
+ )
+ except (OSError, IOError, FileNotFoundError):
+ return None
+
+ return revision
+
+
+def get_default_release():
+ # type: () -> Optional[str]
+ """Try to guess a default release."""
+ release = os.environ.get("SENTRY_RELEASE")
+ if release:
+ return release
+
+ release = get_git_revision()
+ if release:
+ return release
+
+ for var in (
+ "HEROKU_SLUG_COMMIT",
+ "SOURCE_VERSION",
+ "CODEBUILD_RESOLVED_SOURCE_VERSION",
+ "CIRCLE_SHA1",
+ "GAE_DEPLOYMENT_ID",
+ ):
+ release = os.environ.get(var)
+ if release:
+ return release
+ return None
+
+
+def get_sdk_name(installed_integrations):
+ # type: (List[str]) -> str
+ """Return the SDK name including the name of the used web framework."""
+
+ # Note: I can not use for example sentry_sdk.integrations.django.DjangoIntegration.identifier
+ # here because if django is not installed the integration is not accessible.
+ framework_integrations = [
+ "django",
+ "flask",
+ "fastapi",
+ "bottle",
+ "falcon",
+ "quart",
+ "sanic",
+ "starlette",
+ "litestar",
+ "starlite",
+ "chalice",
+ "serverless",
+ "pyramid",
+ "tornado",
+ "aiohttp",
+ "aws_lambda",
+ "gcp",
+ "beam",
+ "asgi",
+ "wsgi",
+ ]
+
+ for integration in framework_integrations:
+ if integration in installed_integrations:
+ return "sentry.python.{}".format(integration)
+
+ return "sentry.python"
+
+
+class CaptureInternalException:
+ __slots__ = ()
+
+ def __enter__(self):
+ # type: () -> ContextManager[Any]
+ return self
+
+ def __exit__(self, ty, value, tb):
+ # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> bool
+ if ty is not None and value is not None:
+ capture_internal_exception((ty, value, tb))
+
+ return True
+
+
+_CAPTURE_INTERNAL_EXCEPTION = CaptureInternalException()
+
+
+def capture_internal_exceptions():
+ # type: () -> ContextManager[Any]
+ return _CAPTURE_INTERNAL_EXCEPTION
+
+
+def capture_internal_exception(exc_info):
+ # type: (ExcInfo) -> None
+ """
+ Capture an exception that is likely caused by a bug in the SDK
+ itself.
+
+ These exceptions do not end up in Sentry and are just logged instead.
+ """
+ if sentry_sdk.get_client().is_active():
+ logger.error("Internal error in sentry_sdk", exc_info=exc_info)
+
+
+def to_timestamp(value):
+ # type: (datetime) -> float
+ return (value - epoch).total_seconds()
+
+
+def format_timestamp(value):
+ # type: (datetime) -> str
+ """Formats a timestamp in RFC 3339 format.
+
+ Any datetime objects with a non-UTC timezone are converted to UTC, so that all timestamps are formatted in UTC.
+ """
+ utctime = value.astimezone(timezone.utc)
+
+ # We use this custom formatting rather than isoformat for backwards compatibility (we have used this format for
+ # several years now), and isoformat is slightly different.
+ return utctime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+
+
+ISO_TZ_SEPARATORS = frozenset(("+", "-"))
+
+
+def datetime_from_isoformat(value):
+ # type: (str) -> datetime
+ try:
+ result = datetime.fromisoformat(value)
+ except (AttributeError, ValueError):
+ # py 3.6
+ timestamp_format = (
+ "%Y-%m-%dT%H:%M:%S.%f" if "." in value else "%Y-%m-%dT%H:%M:%S"
+ )
+ if value.endswith("Z"):
+ value = value[:-1] + "+0000"
+
+ if value[-6] in ISO_TZ_SEPARATORS:
+ timestamp_format += "%z"
+ value = value[:-3] + value[-2:]
+ elif value[-5] in ISO_TZ_SEPARATORS:
+ timestamp_format += "%z"
+
+ result = datetime.strptime(value, timestamp_format)
+ return result.astimezone(timezone.utc)
+
+
+def event_hint_with_exc_info(exc_info=None):
+ # type: (Optional[ExcInfo]) -> Dict[str, Optional[ExcInfo]]
+ """Creates a hint with the exc info filled in."""
+ if exc_info is None:
+ exc_info = sys.exc_info()
+ else:
+ exc_info = exc_info_from_error(exc_info)
+ if exc_info[0] is None:
+ exc_info = None
+ return {"exc_info": exc_info}
+
+
+class BadDsn(ValueError):
+ """Raised on invalid DSNs."""
+
+
+class Dsn:
+ """Represents a DSN."""
+
+ def __init__(self, value):
+ # type: (Union[Dsn, str]) -> None
+ if isinstance(value, Dsn):
+ self.__dict__ = dict(value.__dict__)
+ return
+ parts = urlsplit(str(value))
+
+ if parts.scheme not in ("http", "https"):
+ raise BadDsn("Unsupported scheme %r" % parts.scheme)
+ self.scheme = parts.scheme
+
+ if parts.hostname is None:
+ raise BadDsn("Missing hostname")
+
+ self.host = parts.hostname
+
+ if parts.port is None:
+ self.port = self.scheme == "https" and 443 or 80 # type: int
+ else:
+ self.port = parts.port
+
+ if not parts.username:
+ raise BadDsn("Missing public key")
+
+ self.public_key = parts.username
+ self.secret_key = parts.password
+
+ path = parts.path.rsplit("/", 1)
+
+ try:
+ self.project_id = str(int(path.pop()))
+ except (ValueError, TypeError):
+ raise BadDsn("Invalid project in DSN (%r)" % (parts.path or "")[1:])
+
+ self.path = "/".join(path) + "/"
+
+ @property
+ def netloc(self):
+ # type: () -> str
+ """The netloc part of a DSN."""
+ rv = self.host
+ if (self.scheme, self.port) not in (("http", 80), ("https", 443)):
+ rv = "%s:%s" % (rv, self.port)
+ return rv
+
+ def to_auth(self, client=None):
+ # type: (Optional[Any]) -> Auth
+ """Returns the auth info object for this dsn."""
+ return Auth(
+ scheme=self.scheme,
+ host=self.netloc,
+ path=self.path,
+ project_id=self.project_id,
+ public_key=self.public_key,
+ secret_key=self.secret_key,
+ client=client,
+ )
+
+ def __str__(self):
+ # type: () -> str
+ return "%s://%s%s@%s%s%s" % (
+ self.scheme,
+ self.public_key,
+ self.secret_key and "@" + self.secret_key or "",
+ self.netloc,
+ self.path,
+ self.project_id,
+ )
+
+
+class Auth:
+ """Helper object that represents the auth info."""
+
+ def __init__(
+ self,
+ scheme,
+ host,
+ project_id,
+ public_key,
+ secret_key=None,
+ version=7,
+ client=None,
+ path="/",
+ ):
+ # type: (str, str, str, str, Optional[str], int, Optional[Any], str) -> None
+ self.scheme = scheme
+ self.host = host
+ self.path = path
+ self.project_id = project_id
+ self.public_key = public_key
+ self.secret_key = secret_key
+ self.version = version
+ self.client = client
+
+ def get_api_url(
+ self, type=EndpointType.ENVELOPE # type: EndpointType
+ ):
+ # type: (...) -> str
+ """Returns the API url for storing events."""
+ return "%s://%s%sapi/%s/%s/" % (
+ self.scheme,
+ self.host,
+ self.path,
+ self.project_id,
+ type.value,
+ )
+
+ def to_header(self):
+ # type: () -> str
+ """Returns the auth header a string."""
+ rv = [("sentry_key", self.public_key), ("sentry_version", self.version)]
+ if self.client is not None:
+ rv.append(("sentry_client", self.client))
+ if self.secret_key is not None:
+ rv.append(("sentry_secret", self.secret_key))
+ return "Sentry " + ", ".join("%s=%s" % (key, value) for key, value in rv)
+
+
+def get_type_name(cls):
+ # type: (Optional[type]) -> Optional[str]
+ return getattr(cls, "__qualname__", None) or getattr(cls, "__name__", None)
+
+
+def get_type_module(cls):
+ # type: (Optional[type]) -> Optional[str]
+ mod = getattr(cls, "__module__", None)
+ if mod not in (None, "builtins", "__builtins__"):
+ return mod
+ return None
+
+
+def should_hide_frame(frame):
+ # type: (FrameType) -> bool
+ try:
+ mod = frame.f_globals["__name__"]
+ if mod.startswith("sentry_sdk."):
+ return True
+ except (AttributeError, KeyError):
+ pass
+
+ for flag_name in "__traceback_hide__", "__tracebackhide__":
+ try:
+ if frame.f_locals[flag_name]:
+ return True
+ except Exception:
+ pass
+
+ return False
+
+
+def iter_stacks(tb):
+ # type: (Optional[TracebackType]) -> Iterator[TracebackType]
+ tb_ = tb # type: Optional[TracebackType]
+ while tb_ is not None:
+ if not should_hide_frame(tb_.tb_frame):
+ yield tb_
+ tb_ = tb_.tb_next
+
+
+def get_lines_from_file(
+ filename, # type: str
+ lineno, # type: int
+ max_length=None, # type: Optional[int]
+ loader=None, # type: Optional[Any]
+ module=None, # type: Optional[str]
+):
+ # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]
+ context_lines = 5
+ source = None
+ if loader is not None and hasattr(loader, "get_source"):
+ try:
+ source_str = loader.get_source(module) # type: Optional[str]
+ except (ImportError, IOError):
+ source_str = None
+ if source_str is not None:
+ source = source_str.splitlines()
+
+ if source is None:
+ try:
+ source = linecache.getlines(filename)
+ except (OSError, IOError):
+ return [], None, []
+
+ if not source:
+ return [], None, []
+
+ lower_bound = max(0, lineno - context_lines)
+ upper_bound = min(lineno + 1 + context_lines, len(source))
+
+ try:
+ pre_context = [
+ strip_string(line.strip("\r\n"), max_length=max_length)
+ for line in source[lower_bound:lineno]
+ ]
+ context_line = strip_string(source[lineno].strip("\r\n"), max_length=max_length)
+ post_context = [
+ strip_string(line.strip("\r\n"), max_length=max_length)
+ for line in source[(lineno + 1) : upper_bound]
+ ]
+ return pre_context, context_line, post_context
+ except IndexError:
+ # the file may have changed since it was loaded into memory
+ return [], None, []
+
+
+def get_source_context(
+ frame, # type: FrameType
+ tb_lineno, # type: Optional[int]
+ max_value_length=None, # type: Optional[int]
+):
+ # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]
+ try:
+ abs_path = frame.f_code.co_filename # type: Optional[str]
+ except Exception:
+ abs_path = None
+ try:
+ module = frame.f_globals["__name__"]
+ except Exception:
+ return [], None, []
+ try:
+ loader = frame.f_globals["__loader__"]
+ except Exception:
+ loader = None
+
+ if tb_lineno is not None and abs_path:
+ lineno = tb_lineno - 1
+ return get_lines_from_file(
+ abs_path, lineno, max_value_length, loader=loader, module=module
+ )
+
+ return [], None, []
+
+
+def safe_str(value):
+ # type: (Any) -> str
+ try:
+ return str(value)
+ except Exception:
+ return safe_repr(value)
+
+
+def safe_repr(value):
+ # type: (Any) -> str
+ try:
+ return repr(value)
+ except Exception:
+ return "<broken repr>"
+
+
+def filename_for_module(module, abs_path):
+ # type: (Optional[str], Optional[str]) -> Optional[str]
+ if not abs_path or not module:
+ return abs_path
+
+ try:
+ if abs_path.endswith(".pyc"):
+ abs_path = abs_path[:-1]
+
+ base_module = module.split(".", 1)[0]
+ if base_module == module:
+ return os.path.basename(abs_path)
+
+ base_module_path = sys.modules[base_module].__file__
+ if not base_module_path:
+ return abs_path
+
+ return abs_path.split(base_module_path.rsplit(os.sep, 2)[0], 1)[-1].lstrip(
+ os.sep
+ )
+ except Exception:
+ return abs_path
+
+
+def serialize_frame(
+ frame,
+ tb_lineno=None,
+ include_local_variables=True,
+ include_source_context=True,
+ max_value_length=None,
+ custom_repr=None,
+):
+ # type: (FrameType, Optional[int], bool, bool, Optional[int], Optional[Callable[..., Optional[str]]]) -> Dict[str, Any]
+ f_code = getattr(frame, "f_code", None)
+ if not f_code:
+ abs_path = None
+ function = None
+ else:
+ abs_path = frame.f_code.co_filename
+ function = frame.f_code.co_name
+ try:
+ module = frame.f_globals["__name__"]
+ except Exception:
+ module = None
+
+ if tb_lineno is None:
+ tb_lineno = frame.f_lineno
+
+ rv = {
+ "filename": filename_for_module(module, abs_path) or None,
+ "abs_path": os.path.abspath(abs_path) if abs_path else None,
+ "function": function or "<unknown>",
+ "module": module,
+ "lineno": tb_lineno,
+ } # type: Dict[str, Any]
+
+ if include_source_context:
+ rv["pre_context"], rv["context_line"], rv["post_context"] = get_source_context(
+ frame, tb_lineno, max_value_length
+ )
+
+ if include_local_variables:
+ from sentry_sdk.serializer import serialize
+
+ rv["vars"] = serialize(
+ dict(frame.f_locals), is_vars=True, custom_repr=custom_repr
+ )
+
+ return rv
+
+
+def current_stacktrace(
+ include_local_variables=True, # type: bool
+ include_source_context=True, # type: bool
+ max_value_length=None, # type: Optional[int]
+):
+ # type: (...) -> Dict[str, Any]
+ __tracebackhide__ = True
+ frames = []
+
+ f = sys._getframe() # type: Optional[FrameType]
+ while f is not None:
+ if not should_hide_frame(f):
+ frames.append(
+ serialize_frame(
+ f,
+ include_local_variables=include_local_variables,
+ include_source_context=include_source_context,
+ max_value_length=max_value_length,
+ )
+ )
+ f = f.f_back
+
+ frames.reverse()
+
+ return {"frames": frames}
+
+
+def get_errno(exc_value):
+ # type: (BaseException) -> Optional[Any]
+ return getattr(exc_value, "errno", None)
+
+
+def get_error_message(exc_value):
+ # type: (Optional[BaseException]) -> str
+ message = (
+ getattr(exc_value, "message", "")
+ or getattr(exc_value, "detail", "")
+ or safe_str(exc_value)
+ ) # type: str
+
+ # __notes__ should be a list of strings when notes are added
+ # via add_note, but can be anything else if __notes__ is set
+ # directly. We only support strings in __notes__, since that
+ # is the correct use.
+ notes = getattr(exc_value, "__notes__", None) # type: object
+ if isinstance(notes, list) and len(notes) > 0:
+ message += "\n" + "\n".join(note for note in notes if isinstance(note, str))
+
+ return message
+
+
+def single_exception_from_error_tuple(
+ exc_type, # type: Optional[type]
+ exc_value, # type: Optional[BaseException]
+ tb, # type: Optional[TracebackType]
+ client_options=None, # type: Optional[Dict[str, Any]]
+ mechanism=None, # type: Optional[Dict[str, Any]]
+ exception_id=None, # type: Optional[int]
+ parent_id=None, # type: Optional[int]
+ source=None, # type: Optional[str]
+ full_stack=None, # type: Optional[list[dict[str, Any]]]
+):
+ # type: (...) -> Dict[str, Any]
+ """
+ Creates a dict that goes into the events `exception.values` list and is ingestible by Sentry.
+
+ See the Exception Interface documentation for more details:
+ https://develop.sentry.dev/sdk/event-payloads/exception/
+ """
+ exception_value = {} # type: Dict[str, Any]
+ exception_value["mechanism"] = (
+ mechanism.copy() if mechanism else {"type": "generic", "handled": True}
+ )
+ if exception_id is not None:
+ exception_value["mechanism"]["exception_id"] = exception_id
+
+ if exc_value is not None:
+ errno = get_errno(exc_value)
+ else:
+ errno = None
+
+ if errno is not None:
+ exception_value["mechanism"].setdefault("meta", {}).setdefault(
+ "errno", {}
+ ).setdefault("number", errno)
+
+ if source is not None:
+ exception_value["mechanism"]["source"] = source
+
+ is_root_exception = exception_id == 0
+ if not is_root_exception and parent_id is not None:
+ exception_value["mechanism"]["parent_id"] = parent_id
+ exception_value["mechanism"]["type"] = "chained"
+
+ if is_root_exception and "type" not in exception_value["mechanism"]:
+ exception_value["mechanism"]["type"] = "generic"
+
+ is_exception_group = BaseExceptionGroup is not None and isinstance(
+ exc_value, BaseExceptionGroup
+ )
+ if is_exception_group:
+ exception_value["mechanism"]["is_exception_group"] = True
+
+ exception_value["module"] = get_type_module(exc_type)
+ exception_value["type"] = get_type_name(exc_type)
+ exception_value["value"] = get_error_message(exc_value)
+
+ if client_options is None:
+ include_local_variables = True
+ include_source_context = True
+ max_value_length = DEFAULT_MAX_VALUE_LENGTH # fallback
+ custom_repr = None
+ else:
+ include_local_variables = client_options["include_local_variables"]
+ include_source_context = client_options["include_source_context"]
+ max_value_length = client_options["max_value_length"]
+ custom_repr = client_options.get("custom_repr")
+
+ frames = [
+ serialize_frame(
+ tb.tb_frame,
+ tb_lineno=tb.tb_lineno,
+ include_local_variables=include_local_variables,
+ include_source_context=include_source_context,
+ max_value_length=max_value_length,
+ custom_repr=custom_repr,
+ )
+ for tb in iter_stacks(tb)
+ ] # type: List[Dict[str, Any]]
+
+ if frames:
+ if not full_stack:
+ new_frames = frames
+ else:
+ new_frames = merge_stack_frames(frames, full_stack, client_options)
+
+ exception_value["stacktrace"] = {"frames": new_frames}
+
+ return exception_value
+
+
+HAS_CHAINED_EXCEPTIONS = hasattr(Exception, "__suppress_context__")
+
+if HAS_CHAINED_EXCEPTIONS:
+
+ def walk_exception_chain(exc_info):
+ # type: (ExcInfo) -> Iterator[ExcInfo]
+ exc_type, exc_value, tb = exc_info
+
+ seen_exceptions = []
+ seen_exception_ids = set() # type: Set[int]
+
+ while (
+ exc_type is not None
+ and exc_value is not None
+ and id(exc_value) not in seen_exception_ids
+ ):
+ yield exc_type, exc_value, tb
+
+ # Avoid hashing random types we don't know anything
+ # about. Use the list to keep a ref so that the `id` is
+ # not used for another object.
+ seen_exceptions.append(exc_value)
+ seen_exception_ids.add(id(exc_value))
+
+ if exc_value.__suppress_context__:
+ cause = exc_value.__cause__
+ else:
+ cause = exc_value.__context__
+ if cause is None:
+ break
+ exc_type = type(cause)
+ exc_value = cause
+ tb = getattr(cause, "__traceback__", None)
+
+else:
+
+ def walk_exception_chain(exc_info):
+ # type: (ExcInfo) -> Iterator[ExcInfo]
+ yield exc_info
+
+
+def exceptions_from_error(
+ exc_type, # type: Optional[type]
+ exc_value, # type: Optional[BaseException]
+ tb, # type: Optional[TracebackType]
+ client_options=None, # type: Optional[Dict[str, Any]]
+ mechanism=None, # type: Optional[Dict[str, Any]]
+ exception_id=0, # type: int
+ parent_id=0, # type: int
+ source=None, # type: Optional[str]
+ full_stack=None, # type: Optional[list[dict[str, Any]]]
+):
+ # type: (...) -> Tuple[int, List[Dict[str, Any]]]
+ """
+ Creates the list of exceptions.
+ This can include chained exceptions and exceptions from an ExceptionGroup.
+
+ See the Exception Interface documentation for more details:
+ https://develop.sentry.dev/sdk/event-payloads/exception/
+ """
+
+ parent = single_exception_from_error_tuple(
+ exc_type=exc_type,
+ exc_value=exc_value,
+ tb=tb,
+ client_options=client_options,
+ mechanism=mechanism,
+ exception_id=exception_id,
+ parent_id=parent_id,
+ source=source,
+ full_stack=full_stack,
+ )
+ exceptions = [parent]
+
+ parent_id = exception_id
+ exception_id += 1
+
+ should_supress_context = hasattr(exc_value, "__suppress_context__") and exc_value.__suppress_context__ # type: ignore
+ if should_supress_context:
+ # Add direct cause.
+ # The field `__cause__` is set when raised with the exception (using the `from` keyword).
+ exception_has_cause = (
+ exc_value
+ and hasattr(exc_value, "__cause__")
+ and exc_value.__cause__ is not None
+ )
+ if exception_has_cause:
+ cause = exc_value.__cause__ # type: ignore
+ (exception_id, child_exceptions) = exceptions_from_error(
+ exc_type=type(cause),
+ exc_value=cause,
+ tb=getattr(cause, "__traceback__", None),
+ client_options=client_options,
+ mechanism=mechanism,
+ exception_id=exception_id,
+ source="__cause__",
+ full_stack=full_stack,
+ )
+ exceptions.extend(child_exceptions)
+
+ else:
+ # Add indirect cause.
+ # The field `__context__` is assigned if another exception occurs while handling the exception.
+ exception_has_content = (
+ exc_value
+ and hasattr(exc_value, "__context__")
+ and exc_value.__context__ is not None
+ )
+ if exception_has_content:
+ context = exc_value.__context__ # type: ignore
+ (exception_id, child_exceptions) = exceptions_from_error(
+ exc_type=type(context),
+ exc_value=context,
+ tb=getattr(context, "__traceback__", None),
+ client_options=client_options,
+ mechanism=mechanism,
+ exception_id=exception_id,
+ source="__context__",
+ full_stack=full_stack,
+ )
+ exceptions.extend(child_exceptions)
+
+ # Add exceptions from an ExceptionGroup.
+ is_exception_group = exc_value and hasattr(exc_value, "exceptions")
+ if is_exception_group:
+ for idx, e in enumerate(exc_value.exceptions): # type: ignore
+ (exception_id, child_exceptions) = exceptions_from_error(
+ exc_type=type(e),
+ exc_value=e,
+ tb=getattr(e, "__traceback__", None),
+ client_options=client_options,
+ mechanism=mechanism,
+ exception_id=exception_id,
+ parent_id=parent_id,
+ source="exceptions[%s]" % idx,
+ full_stack=full_stack,
+ )
+ exceptions.extend(child_exceptions)
+
+ return (exception_id, exceptions)
+
+
+def exceptions_from_error_tuple(
+ exc_info, # type: ExcInfo
+ client_options=None, # type: Optional[Dict[str, Any]]
+ mechanism=None, # type: Optional[Dict[str, Any]]
+ full_stack=None, # type: Optional[list[dict[str, Any]]]
+):
+ # type: (...) -> List[Dict[str, Any]]
+ exc_type, exc_value, tb = exc_info
+
+ is_exception_group = BaseExceptionGroup is not None and isinstance(
+ exc_value, BaseExceptionGroup
+ )
+
+ if is_exception_group:
+ (_, exceptions) = exceptions_from_error(
+ exc_type=exc_type,
+ exc_value=exc_value,
+ tb=tb,
+ client_options=client_options,
+ mechanism=mechanism,
+ exception_id=0,
+ parent_id=0,
+ full_stack=full_stack,
+ )
+
+ else:
+ exceptions = []
+ for exc_type, exc_value, tb in walk_exception_chain(exc_info):
+ exceptions.append(
+ single_exception_from_error_tuple(
+ exc_type=exc_type,
+ exc_value=exc_value,
+ tb=tb,
+ client_options=client_options,
+ mechanism=mechanism,
+ full_stack=full_stack,
+ )
+ )
+
+ exceptions.reverse()
+
+ return exceptions
+
+
+def to_string(value):
+ # type: (str) -> str
+ try:
+ return str(value)
+ except UnicodeDecodeError:
+ return repr(value)[1:-1]
+
+
+def iter_event_stacktraces(event):
+ # type: (Event) -> Iterator[Dict[str, Any]]
+ if "stacktrace" in event:
+ yield event["stacktrace"]
+ if "threads" in event:
+ for thread in event["threads"].get("values") or ():
+ if "stacktrace" in thread:
+ yield thread["stacktrace"]
+ if "exception" in event:
+ for exception in event["exception"].get("values") or ():
+ if "stacktrace" in exception:
+ yield exception["stacktrace"]
+
+
+def iter_event_frames(event):
+ # type: (Event) -> Iterator[Dict[str, Any]]
+ for stacktrace in iter_event_stacktraces(event):
+ for frame in stacktrace.get("frames") or ():
+ yield frame
+
+
+def handle_in_app(event, in_app_exclude=None, in_app_include=None, project_root=None):
+ # type: (Event, Optional[List[str]], Optional[List[str]], Optional[str]) -> Event
+ for stacktrace in iter_event_stacktraces(event):
+ set_in_app_in_frames(
+ stacktrace.get("frames"),
+ in_app_exclude=in_app_exclude,
+ in_app_include=in_app_include,
+ project_root=project_root,
+ )
+
+ return event
+
+
+def set_in_app_in_frames(frames, in_app_exclude, in_app_include, project_root=None):
+ # type: (Any, Optional[List[str]], Optional[List[str]], Optional[str]) -> Optional[Any]
+ if not frames:
+ return None
+
+ for frame in frames:
+ # if frame has already been marked as in_app, skip it
+ current_in_app = frame.get("in_app")
+ if current_in_app is not None:
+ continue
+
+ module = frame.get("module")
+
+ # check if module in frame is in the list of modules to include
+ if _module_in_list(module, in_app_include):
+ frame["in_app"] = True
+ continue
+
+ # check if module in frame is in the list of modules to exclude
+ if _module_in_list(module, in_app_exclude):
+ frame["in_app"] = False
+ continue
+
+ # if frame has no abs_path, skip further checks
+ abs_path = frame.get("abs_path")
+ if abs_path is None:
+ continue
+
+ if _is_external_source(abs_path):
+ frame["in_app"] = False
+ continue
+
+ if _is_in_project_root(abs_path, project_root):
+ frame["in_app"] = True
+ continue
+
+ return frames
+
+
+def exc_info_from_error(error):
+ # type: (Union[BaseException, ExcInfo]) -> ExcInfo
+ if isinstance(error, tuple) and len(error) == 3:
+ exc_type, exc_value, tb = error
+ elif isinstance(error, BaseException):
+ tb = getattr(error, "__traceback__", None)
+ if tb is not None:
+ exc_type = type(error)
+ exc_value = error
+ else:
+ exc_type, exc_value, tb = sys.exc_info()
+ if exc_value is not error:
+ tb = None
+ exc_value = error
+ exc_type = type(error)
+
+ else:
+ raise ValueError("Expected Exception object to report, got %s!" % type(error))
+
+ exc_info = (exc_type, exc_value, tb)
+
+ if TYPE_CHECKING:
+ # This cast is safe because exc_type and exc_value are either both
+ # None or both not None.
+ exc_info = cast(ExcInfo, exc_info)
+
+ return exc_info
+
+
+def merge_stack_frames(frames, full_stack, client_options):
+ # type: (List[Dict[str, Any]], List[Dict[str, Any]], Optional[Dict[str, Any]]) -> List[Dict[str, Any]]
+ """
+ Add the missing frames from full_stack to frames and return the merged list.
+ """
+ frame_ids = {
+ (
+ frame["abs_path"],
+ frame["context_line"],
+ frame["lineno"],
+ frame["function"],
+ )
+ for frame in frames
+ }
+
+ new_frames = [
+ stackframe
+ for stackframe in full_stack
+ if (
+ stackframe["abs_path"],
+ stackframe["context_line"],
+ stackframe["lineno"],
+ stackframe["function"],
+ )
+ not in frame_ids
+ ]
+ new_frames.extend(frames)
+
+ # Limit the number of frames
+ max_stack_frames = (
+ client_options.get("max_stack_frames", DEFAULT_MAX_STACK_FRAMES)
+ if client_options
+ else None
+ )
+ if max_stack_frames is not None:
+ new_frames = new_frames[len(new_frames) - max_stack_frames :]
+
+ return new_frames
+
+
+def event_from_exception(
+ exc_info, # type: Union[BaseException, ExcInfo]
+ client_options=None, # type: Optional[Dict[str, Any]]
+ mechanism=None, # type: Optional[Dict[str, Any]]
+):
+ # type: (...) -> Tuple[Event, Dict[str, Any]]
+ exc_info = exc_info_from_error(exc_info)
+ hint = event_hint_with_exc_info(exc_info)
+
+ if client_options and client_options.get("add_full_stack", DEFAULT_ADD_FULL_STACK):
+ full_stack = current_stacktrace(
+ include_local_variables=client_options["include_local_variables"],
+ max_value_length=client_options["max_value_length"],
+ )["frames"]
+ else:
+ full_stack = None
+
+ return (
+ {
+ "level": "error",
+ "exception": {
+ "values": exceptions_from_error_tuple(
+ exc_info, client_options, mechanism, full_stack
+ )
+ },
+ },
+ hint,
+ )
+
+
+def _module_in_list(name, items):
+ # type: (Optional[str], Optional[List[str]]) -> bool
+ if name is None:
+ return False
+
+ if not items:
+ return False
+
+ for item in items:
+ if item == name or name.startswith(item + "."):
+ return True
+
+ return False
+
+
+def _is_external_source(abs_path):
+ # type: (Optional[str]) -> bool
+ # check if frame is in 'site-packages' or 'dist-packages'
+ if abs_path is None:
+ return False
+
+ external_source = (
+ re.search(r"[\\/](?:dist|site)-packages[\\/]", abs_path) is not None
+ )
+ return external_source
+
+
+def _is_in_project_root(abs_path, project_root):
+ # type: (Optional[str], Optional[str]) -> bool
+ if abs_path is None or project_root is None:
+ return False
+
+ # check if path is in the project root
+ if abs_path.startswith(project_root):
+ return True
+
+ return False
+
+
+def _truncate_by_bytes(string, max_bytes):
+ # type: (str, int) -> str
+ """
+ Truncate a UTF-8-encodable string to the last full codepoint so that it fits in max_bytes.
+ """
+ truncated = string.encode("utf-8")[: max_bytes - 3].decode("utf-8", errors="ignore")
+
+ return truncated + "..."
+
+
+def _get_size_in_bytes(value):
+ # type: (str) -> Optional[int]
+ try:
+ return len(value.encode("utf-8"))
+ except (UnicodeEncodeError, UnicodeDecodeError):
+ return None
+
+
+def strip_string(value, max_length=None):
+ # type: (str, Optional[int]) -> Union[AnnotatedValue, str]
+ if not value:
+ return value
+
+ if max_length is None:
+ max_length = DEFAULT_MAX_VALUE_LENGTH
+
+ byte_size = _get_size_in_bytes(value)
+ text_size = len(value)
+
+ if byte_size is not None and byte_size > max_length:
+ # truncate to max_length bytes, preserving code points
+ truncated_value = _truncate_by_bytes(value, max_length)
+ elif text_size is not None and text_size > max_length:
+ # fallback to truncating by string length
+ truncated_value = value[: max_length - 3] + "..."
+ else:
+ return value
+
+ return AnnotatedValue(
+ value=truncated_value,
+ metadata={
+ "len": byte_size or text_size,
+ "rem": [["!limit", "x", max_length - 3, max_length]],
+ },
+ )
+
+
+def parse_version(version):
+ # type: (str) -> Optional[Tuple[int, ...]]
+ """
+ Parses a version string into a tuple of integers.
+ This uses the parsing loging from PEP 440:
+ https://peps.python.org/pep-0440/#appendix-b-parsing-version-strings-with-regular-expressions
+ """
+ VERSION_PATTERN = r""" # noqa: N806
+ v?
+ (?:
+ (?:(?P<epoch>[0-9]+)!)? # epoch
+ (?P<release>[0-9]+(?:\.[0-9]+)*) # release segment
+ (?P<pre> # pre-release
+ [-_\.]?
+ (?P<pre_l>(a|b|c|rc|alpha|beta|pre|preview))
+ [-_\.]?
+ (?P<pre_n>[0-9]+)?
+ )?
+ (?P<post> # post release
+ (?:-(?P<post_n1>[0-9]+))
+ |
+ (?:
+ [-_\.]?
+ (?P<post_l>post|rev|r)
+ [-_\.]?
+ (?P<post_n2>[0-9]+)?
+ )
+ )?
+ (?P<dev> # dev release
+ [-_\.]?
+ (?P<dev_l>dev)
+ [-_\.]?
+ (?P<dev_n>[0-9]+)?
+ )?
+ )
+ (?:\+(?P<local>[a-z0-9]+(?:[-_\.][a-z0-9]+)*))? # local version
+ """
+
+ pattern = re.compile(
+ r"^\s*" + VERSION_PATTERN + r"\s*$",
+ re.VERBOSE | re.IGNORECASE,
+ )
+
+ try:
+ release = pattern.match(version).groupdict()["release"] # type: ignore
+ release_tuple = tuple(map(int, release.split(".")[:3])) # type: Tuple[int, ...]
+ except (TypeError, ValueError, AttributeError):
+ return None
+
+ return release_tuple
+
+
+def _is_contextvars_broken():
+ # type: () -> bool
+ """
+ Returns whether gevent/eventlet have patched the stdlib in a way where thread locals are now more "correct" than contextvars.
+ """
+ try:
+ import gevent
+ from gevent.monkey import is_object_patched
+
+ # Get the MAJOR and MINOR version numbers of Gevent
+ version_tuple = tuple(
+ [int(part) for part in re.split(r"a|b|rc|\.", gevent.__version__)[:2]]
+ )
+ if is_object_patched("threading", "local"):
+ # Gevent 20.9.0 depends on Greenlet 0.4.17 which natively handles switching
+ # context vars when greenlets are switched, so, Gevent 20.9.0+ is all fine.
+ # Ref: https://github.com/gevent/gevent/blob/83c9e2ae5b0834b8f84233760aabe82c3ba065b4/src/gevent/monkey.py#L604-L609
+ # Gevent 20.5, that doesn't depend on Greenlet 0.4.17 with native support
+ # for contextvars, is able to patch both thread locals and contextvars, in
+ # that case, check if contextvars are effectively patched.
+ if (
+ # Gevent 20.9.0+
+ (sys.version_info >= (3, 7) and version_tuple >= (20, 9))
+ # Gevent 20.5.0+ or Python < 3.7
+ or (is_object_patched("contextvars", "ContextVar"))
+ ):
+ return False
+
+ return True
+ except ImportError:
+ pass
+
+ try:
+ import greenlet
+ from eventlet.patcher import is_monkey_patched # type: ignore
+
+ greenlet_version = parse_version(greenlet.__version__)
+
+ if greenlet_version is None:
+ logger.error(
+ "Internal error in Sentry SDK: Could not parse Greenlet version from greenlet.__version__."
+ )
+ return False
+
+ if is_monkey_patched("thread") and greenlet_version < (0, 5):
+ return True
+ except ImportError:
+ pass
+
+ return False
+
+
+def _make_threadlocal_contextvars(local):
+ # type: (type) -> type
+ class ContextVar:
+ # Super-limited impl of ContextVar
+
+ def __init__(self, name, default=None):
+ # type: (str, Any) -> None
+ self._name = name
+ self._default = default
+ self._local = local()
+ self._original_local = local()
+
+ def get(self, default=None):
+ # type: (Any) -> Any
+ return getattr(self._local, "value", default or self._default)
+
+ def set(self, value):
+ # type: (Any) -> Any
+ token = str(random.getrandbits(64))
+ original_value = self.get()
+ setattr(self._original_local, token, original_value)
+ self._local.value = value
+ return token
+
+ def reset(self, token):
+ # type: (Any) -> None
+ self._local.value = getattr(self._original_local, token)
+ # delete the original value (this way it works in Python 3.6+)
+ del self._original_local.__dict__[token]
+
+ return ContextVar
+
+
+def _get_contextvars():
+ # type: () -> Tuple[bool, type]
+ """
+ Figure out the "right" contextvars installation to use. Returns a
+ `contextvars.ContextVar`-like class with a limited API.
+
+ See https://docs.sentry.io/platforms/python/contextvars/ for more information.
+ """
+ if not _is_contextvars_broken():
+ # aiocontextvars is a PyPI package that ensures that the contextvars
+ # backport (also a PyPI package) works with asyncio under Python 3.6
+ #
+ # Import it if available.
+ if sys.version_info < (3, 7):
+ # `aiocontextvars` is absolutely required for functional
+ # contextvars on Python 3.6.
+ try:
+ from aiocontextvars import ContextVar
+
+ return True, ContextVar
+ except ImportError:
+ pass
+ else:
+ # On Python 3.7 contextvars are functional.
+ try:
+ from contextvars import ContextVar
+
+ return True, ContextVar
+ except ImportError:
+ pass
+
+ # Fall back to basic thread-local usage.
+
+ from threading import local
+
+ return False, _make_threadlocal_contextvars(local)
+
+
+HAS_REAL_CONTEXTVARS, ContextVar = _get_contextvars()
+
+CONTEXTVARS_ERROR_MESSAGE = """
+
+With asyncio/ASGI applications, the Sentry SDK requires a functional
+installation of `contextvars` to avoid leaking scope/context data across
+requests.
+
+Please refer to https://docs.sentry.io/platforms/python/contextvars/ for more information.
+"""
+
+
+def qualname_from_function(func):
+ # type: (Callable[..., Any]) -> Optional[str]
+ """Return the qualified name of func. Works with regular function, lambda, partial and partialmethod."""
+ func_qualname = None # type: Optional[str]
+
+ # Python 2
+ try:
+ return "%s.%s.%s" % (
+ func.im_class.__module__, # type: ignore
+ func.im_class.__name__, # type: ignore
+ func.__name__,
+ )
+ except Exception:
+ pass
+
+ prefix, suffix = "", ""
+
+ if isinstance(func, partial) and hasattr(func.func, "__name__"):
+ prefix, suffix = "partial(<function ", ">)"
+ func = func.func
+ else:
+ # The _partialmethod attribute of methods wrapped with partialmethod() was renamed to __partialmethod__ in CPython 3.13:
+ # https://github.com/python/cpython/pull/16600
+ partial_method = getattr(func, "_partialmethod", None) or getattr(
+ func, "__partialmethod__", None
+ )
+ if isinstance(partial_method, partialmethod):
+ prefix, suffix = "partialmethod(<function ", ">)"
+ func = partial_method.func
+
+ if hasattr(func, "__qualname__"):
+ func_qualname = func.__qualname__
+ elif hasattr(func, "__name__"): # Python 2.7 has no __qualname__
+ func_qualname = func.__name__
+
+ # Python 3: methods, functions, classes
+ if func_qualname is not None:
+ if hasattr(func, "__module__") and isinstance(func.__module__, str):
+ func_qualname = func.__module__ + "." + func_qualname
+ func_qualname = prefix + func_qualname + suffix
+
+ return func_qualname
+
+
+def transaction_from_function(func):
+ # type: (Callable[..., Any]) -> Optional[str]
+ return qualname_from_function(func)
+
+
+disable_capture_event = ContextVar("disable_capture_event")
+
+
+class ServerlessTimeoutWarning(Exception): # noqa: N818
+ """Raised when a serverless method is about to reach its timeout."""
+
+ pass
+
+
+class TimeoutThread(threading.Thread):
+ """Creates a Thread which runs (sleeps) for a time duration equal to
+ waiting_time and raises a custom ServerlessTimeout exception.
+ """
+
+ def __init__(self, waiting_time, configured_timeout):
+ # type: (float, int) -> None
+ threading.Thread.__init__(self)
+ self.waiting_time = waiting_time
+ self.configured_timeout = configured_timeout
+ self._stop_event = threading.Event()
+
+ def stop(self):
+ # type: () -> None
+ self._stop_event.set()
+
+ def run(self):
+ # type: () -> None
+
+ self._stop_event.wait(self.waiting_time)
+
+ if self._stop_event.is_set():
+ return
+
+ integer_configured_timeout = int(self.configured_timeout)
+
+ # Setting up the exact integer value of configured time(in seconds)
+ if integer_configured_timeout < self.configured_timeout:
+ integer_configured_timeout = integer_configured_timeout + 1
+
+ # Raising Exception after timeout duration is reached
+ raise ServerlessTimeoutWarning(
+ "WARNING : Function is expected to get timed out. Configured timeout duration = {} seconds.".format(
+ integer_configured_timeout
+ )
+ )
+
+
+def to_base64(original):
+ # type: (str) -> Optional[str]
+ """
+ Convert a string to base64, via UTF-8. Returns None on invalid input.
+ """
+ base64_string = None
+
+ try:
+ utf8_bytes = original.encode("UTF-8")
+ base64_bytes = base64.b64encode(utf8_bytes)
+ base64_string = base64_bytes.decode("UTF-8")
+ except Exception as err:
+ logger.warning("Unable to encode {orig} to base64:".format(orig=original), err)
+
+ return base64_string
+
+
+def from_base64(base64_string):
+ # type: (str) -> Optional[str]
+ """
+ Convert a string from base64, via UTF-8. Returns None on invalid input.
+ """
+ utf8_string = None
+
+ try:
+ only_valid_chars = BASE64_ALPHABET.match(base64_string)
+ assert only_valid_chars
+
+ base64_bytes = base64_string.encode("UTF-8")
+ utf8_bytes = base64.b64decode(base64_bytes)
+ utf8_string = utf8_bytes.decode("UTF-8")
+ except Exception as err:
+ logger.warning(
+ "Unable to decode {b64} from base64:".format(b64=base64_string), err
+ )
+
+ return utf8_string
+
+
+Components = namedtuple("Components", ["scheme", "netloc", "path", "query", "fragment"])
+
+
+def sanitize_url(url, remove_authority=True, remove_query_values=True, split=False):
+ # type: (str, bool, bool, bool) -> Union[str, Components]
+ """
+ Removes the authority and query parameter values from a given URL.
+ """
+ parsed_url = urlsplit(url)
+ query_params = parse_qs(parsed_url.query, keep_blank_values=True)
+
+ # strip username:password (netloc can be usr:pwd@example.com)
+ if remove_authority:
+ netloc_parts = parsed_url.netloc.split("@")
+ if len(netloc_parts) > 1:
+ netloc = "%s:%s@%s" % (
+ SENSITIVE_DATA_SUBSTITUTE,
+ SENSITIVE_DATA_SUBSTITUTE,
+ netloc_parts[-1],
+ )
+ else:
+ netloc = parsed_url.netloc
+ else:
+ netloc = parsed_url.netloc
+
+ # strip values from query string
+ if remove_query_values:
+ query_string = unquote(
+ urlencode({key: SENSITIVE_DATA_SUBSTITUTE for key in query_params})
+ )
+ else:
+ query_string = parsed_url.query
+
+ components = Components(
+ scheme=parsed_url.scheme,
+ netloc=netloc,
+ query=query_string,
+ path=parsed_url.path,
+ fragment=parsed_url.fragment,
+ )
+
+ if split:
+ return components
+ else:
+ return urlunsplit(components)
+
+
+ParsedUrl = namedtuple("ParsedUrl", ["url", "query", "fragment"])
+
+
+def parse_url(url, sanitize=True):
+ # type: (str, bool) -> ParsedUrl
+ """
+ Splits a URL into a url (including path), query and fragment. If sanitize is True, the query
+ parameters will be sanitized to remove sensitive data. The autority (username and password)
+ in the URL will always be removed.
+ """
+ parsed_url = sanitize_url(
+ url, remove_authority=True, remove_query_values=sanitize, split=True
+ )
+
+ base_url = urlunsplit(
+ Components(
+ scheme=parsed_url.scheme, # type: ignore
+ netloc=parsed_url.netloc, # type: ignore
+ query="",
+ path=parsed_url.path, # type: ignore
+ fragment="",
+ )
+ )
+
+ return ParsedUrl(
+ url=base_url,
+ query=parsed_url.query, # type: ignore
+ fragment=parsed_url.fragment, # type: ignore
+ )
+
+
+def is_valid_sample_rate(rate, source):
+ # type: (Any, str) -> bool
+ """
+ Checks the given sample rate to make sure it is valid type and value (a
+ boolean or a number between 0 and 1, inclusive).
+ """
+
+ # both booleans and NaN are instances of Real, so a) checking for Real
+ # checks for the possibility of a boolean also, and b) we have to check
+ # separately for NaN and Decimal does not derive from Real so need to check that too
+ if not isinstance(rate, (Real, Decimal)) or math.isnan(rate):
+ logger.warning(
+ "{source} Given sample rate is invalid. Sample rate must be a boolean or a number between 0 and 1. Got {rate} of type {type}.".format(
+ source=source, rate=rate, type=type(rate)
+ )
+ )
+ return False
+
+ # in case rate is a boolean, it will get cast to 1 if it's True and 0 if it's False
+ rate = float(rate)
+ if rate < 0 or rate > 1:
+ logger.warning(
+ "{source} Given sample rate is invalid. Sample rate must be between 0 and 1. Got {rate}.".format(
+ source=source, rate=rate
+ )
+ )
+ return False
+
+ return True
+
+
+def match_regex_list(item, regex_list=None, substring_matching=False):
+ # type: (str, Optional[List[str]], bool) -> bool
+ if regex_list is None:
+ return False
+
+ for item_matcher in regex_list:
+ if not substring_matching and item_matcher[-1] != "$":
+ item_matcher += "$"
+
+ matched = re.search(item_matcher, item)
+ if matched:
+ return True
+
+ return False
+
+
+def is_sentry_url(client, url):
+ # type: (sentry_sdk.client.BaseClient, str) -> bool
+ """
+ Determines whether the given URL matches the Sentry DSN.
+ """
+ return (
+ client is not None
+ and client.transport is not None
+ and client.transport.parsed_dsn is not None
+ and client.transport.parsed_dsn.netloc in url
+ )
+
+
+def _generate_installed_modules():
+ # type: () -> Iterator[Tuple[str, str]]
+ try:
+ from importlib import metadata
+
+ yielded = set()
+ for dist in metadata.distributions():
+ name = dist.metadata.get("Name", None) # type: ignore[attr-defined]
+ # `metadata` values may be `None`, see:
+ # https://github.com/python/cpython/issues/91216
+ # and
+ # https://github.com/python/importlib_metadata/issues/371
+ if name is not None:
+ normalized_name = _normalize_module_name(name)
+ if dist.version is not None and normalized_name not in yielded:
+ yield normalized_name, dist.version
+ yielded.add(normalized_name)
+
+ except ImportError:
+ # < py3.8
+ try:
+ import pkg_resources
+ except ImportError:
+ return
+
+ for info in pkg_resources.working_set:
+ yield _normalize_module_name(info.key), info.version
+
+
+def _normalize_module_name(name):
+ # type: (str) -> str
+ return name.lower()
+
+
+def _get_installed_modules():
+ # type: () -> Dict[str, str]
+ global _installed_modules
+ if _installed_modules is None:
+ _installed_modules = dict(_generate_installed_modules())
+ return _installed_modules
+
+
+def package_version(package):
+ # type: (str) -> Optional[Tuple[int, ...]]
+ installed_packages = _get_installed_modules()
+ version = installed_packages.get(package)
+ if version is None:
+ return None
+
+ return parse_version(version)
+
+
+def reraise(tp, value, tb=None):
+ # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[Any]) -> NoReturn
+ assert value is not None
+ if value.__traceback__ is not tb:
+ raise value.with_traceback(tb)
+ raise value
+
+
+def _no_op(*_a, **_k):
+ # type: (*Any, **Any) -> None
+ """No-op function for ensure_integration_enabled."""
+ pass
+
+
+if TYPE_CHECKING:
+
+ @overload
+ def ensure_integration_enabled(
+ integration, # type: type[sentry_sdk.integrations.Integration]
+ original_function, # type: Callable[P, R]
+ ):
+ # type: (...) -> Callable[[Callable[P, R]], Callable[P, R]]
+ ...
+
+ @overload
+ def ensure_integration_enabled(
+ integration, # type: type[sentry_sdk.integrations.Integration]
+ ):
+ # type: (...) -> Callable[[Callable[P, None]], Callable[P, None]]
+ ...
+
+
+def ensure_integration_enabled(
+ integration, # type: type[sentry_sdk.integrations.Integration]
+ original_function=_no_op, # type: Union[Callable[P, R], Callable[P, None]]
+):
+ # type: (...) -> Callable[[Callable[P, R]], Callable[P, R]]
+ """
+ Ensures a given integration is enabled prior to calling a Sentry-patched function.
+
+ The function takes as its parameters the integration that must be enabled and the original
+ function that the SDK is patching. The function returns a function that takes the
+ decorated (Sentry-patched) function as its parameter, and returns a function that, when
+ called, checks whether the given integration is enabled. If the integration is enabled, the
+ function calls the decorated, Sentry-patched function. If the integration is not enabled,
+ the original function is called.
+
+ The function also takes care of preserving the original function's signature and docstring.
+
+ Example usage:
+
+ ```python
+ @ensure_integration_enabled(MyIntegration, my_function)
+ def patch_my_function():
+ with sentry_sdk.start_transaction(...):
+ return my_function()
+ ```
+ """
+ if TYPE_CHECKING:
+ # Type hint to ensure the default function has the right typing. The overloads
+ # ensure the default _no_op function is only used when R is None.
+ original_function = cast(Callable[P, R], original_function)
+
+ def patcher(sentry_patched_function):
+ # type: (Callable[P, R]) -> Callable[P, R]
+ def runner(*args: "P.args", **kwargs: "P.kwargs"):
+ # type: (...) -> R
+ if sentry_sdk.get_client().get_integration(integration) is None:
+ return original_function(*args, **kwargs)
+
+ return sentry_patched_function(*args, **kwargs)
+
+ if original_function is _no_op:
+ return wraps(sentry_patched_function)(runner)
+
+ return wraps(original_function)(runner)
+
+ return patcher
+
+
+if PY37:
+
+ def nanosecond_time():
+ # type: () -> int
+ return time.perf_counter_ns()
+
+else:
+
+ def nanosecond_time():
+ # type: () -> int
+ return int(time.perf_counter() * 1e9)
+
+
+def now():
+ # type: () -> float
+ return time.perf_counter()
+
+
+try:
+ from gevent import get_hub as get_gevent_hub
+ from gevent.monkey import is_module_patched
+except ImportError:
+
+ # it's not great that the signatures are different, get_hub can't return None
+ # consider adding an if TYPE_CHECKING to change the signature to Optional[Hub]
+ def get_gevent_hub(): # type: ignore[misc]
+ # type: () -> Optional[Hub]
+ return None
+
+ def is_module_patched(mod_name):
+ # type: (str) -> bool
+ # unable to import from gevent means no modules have been patched
+ return False
+
+
+def is_gevent():
+ # type: () -> bool
+ return is_module_patched("threading") or is_module_patched("_thread")
+
+
+def get_current_thread_meta(thread=None):
+ # type: (Optional[threading.Thread]) -> Tuple[Optional[int], Optional[str]]
+ """
+ Try to get the id of the current thread, with various fall backs.
+ """
+
+ # if a thread is specified, that takes priority
+ if thread is not None:
+ try:
+ thread_id = thread.ident
+ thread_name = thread.name
+ if thread_id is not None:
+ return thread_id, thread_name
+ except AttributeError:
+ pass
+
+ # if the app is using gevent, we should look at the gevent hub first
+ # as the id there differs from what the threading module reports
+ if is_gevent():
+ gevent_hub = get_gevent_hub()
+ if gevent_hub is not None:
+ try:
+ # this is undocumented, so wrap it in try except to be safe
+ return gevent_hub.thread_ident, None
+ except AttributeError:
+ pass
+
+ # use the current thread's id if possible
+ try:
+ thread = threading.current_thread()
+ thread_id = thread.ident
+ thread_name = thread.name
+ if thread_id is not None:
+ return thread_id, thread_name
+ except AttributeError:
+ pass
+
+ # if we can't get the current thread id, fall back to the main thread id
+ try:
+ thread = threading.main_thread()
+ thread_id = thread.ident
+ thread_name = thread.name
+ if thread_id is not None:
+ return thread_id, thread_name
+ except AttributeError:
+ pass
+
+ # we've tried everything, time to give up
+ return None, None
+
+
+def should_be_treated_as_error(ty, value):
+ # type: (Any, Any) -> bool
+ if ty == SystemExit and hasattr(value, "code") and value.code in (0, None):
+ # https://docs.python.org/3/library/exceptions.html#SystemExit
+ return False
+
+ return True
+
+
+if TYPE_CHECKING:
+ T = TypeVar("T")
+
+
+def try_convert(convert_func, value):
+ # type: (Callable[[Any], T], Any) -> Optional[T]
+ """
+ Attempt to convert from an unknown type to a specific type, using the
+ given function. Return None if the conversion fails, i.e. if the function
+ raises an exception.
+ """
+ try:
+ return convert_func(value)
+ except Exception:
+ return None