aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/client.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/client.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/client.py1079
1 files changed, 1079 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/client.py b/.venv/lib/python3.12/site-packages/sentry_sdk/client.py
new file mode 100644
index 00000000..0f973945
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/client.py
@@ -0,0 +1,1079 @@
+import json
+import os
+import time
+import uuid
+import random
+import socket
+import logging
+from collections.abc import Mapping
+from datetime import datetime, timezone
+from importlib import import_module
+from typing import TYPE_CHECKING, List, Dict, cast, overload
+import warnings
+
+from sentry_sdk._compat import PY37, check_uwsgi_thread_support
+from sentry_sdk.utils import (
+ AnnotatedValue,
+ ContextVar,
+ capture_internal_exceptions,
+ current_stacktrace,
+ env_to_bool,
+ format_timestamp,
+ get_sdk_name,
+ get_type_name,
+ get_default_release,
+ handle_in_app,
+ is_gevent,
+ logger,
+)
+from sentry_sdk.serializer import serialize
+from sentry_sdk.tracing import trace
+from sentry_sdk.transport import BaseHttpTransport, make_transport
+from sentry_sdk.consts import (
+ DEFAULT_MAX_VALUE_LENGTH,
+ DEFAULT_OPTIONS,
+ INSTRUMENTER,
+ VERSION,
+ ClientConstructor,
+)
+from sentry_sdk.integrations import _DEFAULT_INTEGRATIONS, setup_integrations
+from sentry_sdk.integrations.dedupe import DedupeIntegration
+from sentry_sdk.sessions import SessionFlusher
+from sentry_sdk.envelope import Envelope
+from sentry_sdk.profiler.continuous_profiler import setup_continuous_profiler
+from sentry_sdk.profiler.transaction_profiler import (
+ has_profiling_enabled,
+ Profile,
+ setup_profiler,
+)
+from sentry_sdk.scrubber import EventScrubber
+from sentry_sdk.monitor import Monitor
+from sentry_sdk.spotlight import setup_spotlight
+
+if TYPE_CHECKING:
+ from typing import Any
+ from typing import Callable
+ from typing import Optional
+ from typing import Sequence
+ from typing import Type
+ from typing import Union
+ from typing import TypeVar
+
+ from sentry_sdk._types import Event, Hint, SDKInfo, Log
+ from sentry_sdk.integrations import Integration
+ from sentry_sdk.metrics import MetricsAggregator
+ from sentry_sdk.scope import Scope
+ from sentry_sdk.session import Session
+ from sentry_sdk.spotlight import SpotlightClient
+ from sentry_sdk.transport import Transport
+
+ I = TypeVar("I", bound=Integration) # noqa: E741
+
+_client_init_debug = ContextVar("client_init_debug")
+
+
+SDK_INFO = {
+ "name": "sentry.python", # SDK name will be overridden after integrations have been loaded with sentry_sdk.integrations.setup_integrations()
+ "version": VERSION,
+ "packages": [{"name": "pypi:sentry-sdk", "version": VERSION}],
+} # type: SDKInfo
+
+
+def _get_options(*args, **kwargs):
+ # type: (*Optional[str], **Any) -> Dict[str, Any]
+ if args and (isinstance(args[0], (bytes, str)) or args[0] is None):
+ dsn = args[0] # type: Optional[str]
+ args = args[1:]
+ else:
+ dsn = None
+
+ if len(args) > 1:
+ raise TypeError("Only single positional argument is expected")
+
+ rv = dict(DEFAULT_OPTIONS)
+ options = dict(*args, **kwargs)
+ if dsn is not None and options.get("dsn") is None:
+ options["dsn"] = dsn
+
+ for key, value in options.items():
+ if key not in rv:
+ raise TypeError("Unknown option %r" % (key,))
+
+ rv[key] = value
+
+ if rv["dsn"] is None:
+ rv["dsn"] = os.environ.get("SENTRY_DSN")
+
+ if rv["release"] is None:
+ rv["release"] = get_default_release()
+
+ if rv["environment"] is None:
+ rv["environment"] = os.environ.get("SENTRY_ENVIRONMENT") or "production"
+
+ if rv["debug"] is None:
+ rv["debug"] = env_to_bool(os.environ.get("SENTRY_DEBUG", "False"), strict=True)
+
+ if rv["server_name"] is None and hasattr(socket, "gethostname"):
+ rv["server_name"] = socket.gethostname()
+
+ if rv["instrumenter"] is None:
+ rv["instrumenter"] = INSTRUMENTER.SENTRY
+
+ if rv["project_root"] is None:
+ try:
+ project_root = os.getcwd()
+ except Exception:
+ project_root = None
+
+ rv["project_root"] = project_root
+
+ if rv["enable_tracing"] is True and rv["traces_sample_rate"] is None:
+ rv["traces_sample_rate"] = 1.0
+
+ if rv["event_scrubber"] is None:
+ rv["event_scrubber"] = EventScrubber(
+ send_default_pii=(
+ False if rv["send_default_pii"] is None else rv["send_default_pii"]
+ )
+ )
+
+ if rv["socket_options"] and not isinstance(rv["socket_options"], list):
+ logger.warning(
+ "Ignoring socket_options because of unexpected format. See urllib3.HTTPConnection.socket_options for the expected format."
+ )
+ rv["socket_options"] = None
+
+ if rv["enable_tracing"] is not None:
+ warnings.warn(
+ "The `enable_tracing` parameter is deprecated. Please use `traces_sample_rate` instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+ return rv
+
+
+try:
+ # Python 3.6+
+ module_not_found_error = ModuleNotFoundError
+except Exception:
+ # Older Python versions
+ module_not_found_error = ImportError # type: ignore
+
+
+class BaseClient:
+ """
+ .. versionadded:: 2.0.0
+
+ The basic definition of a client that is used for sending data to Sentry.
+ """
+
+ spotlight = None # type: Optional[SpotlightClient]
+
+ def __init__(self, options=None):
+ # type: (Optional[Dict[str, Any]]) -> None
+ self.options = (
+ options if options is not None else DEFAULT_OPTIONS
+ ) # type: Dict[str, Any]
+
+ self.transport = None # type: Optional[Transport]
+ self.monitor = None # type: Optional[Monitor]
+ self.metrics_aggregator = None # type: Optional[MetricsAggregator]
+
+ def __getstate__(self, *args, **kwargs):
+ # type: (*Any, **Any) -> Any
+ return {"options": {}}
+
+ def __setstate__(self, *args, **kwargs):
+ # type: (*Any, **Any) -> None
+ pass
+
+ @property
+ def dsn(self):
+ # type: () -> Optional[str]
+ return None
+
+ def should_send_default_pii(self):
+ # type: () -> bool
+ return False
+
+ def is_active(self):
+ # type: () -> bool
+ """
+ .. versionadded:: 2.0.0
+
+ Returns whether the client is active (able to send data to Sentry)
+ """
+ return False
+
+ def capture_event(self, *args, **kwargs):
+ # type: (*Any, **Any) -> Optional[str]
+ return None
+
+ def capture_log(self, scope, severity_text, severity_number, template, **kwargs):
+ # type: (Scope, str, int, str, **Any) -> None
+ pass
+
+ def capture_session(self, *args, **kwargs):
+ # type: (*Any, **Any) -> None
+ return None
+
+ if TYPE_CHECKING:
+
+ @overload
+ def get_integration(self, name_or_class):
+ # type: (str) -> Optional[Integration]
+ ...
+
+ @overload
+ def get_integration(self, name_or_class):
+ # type: (type[I]) -> Optional[I]
+ ...
+
+ def get_integration(self, name_or_class):
+ # type: (Union[str, type[Integration]]) -> Optional[Integration]
+ return None
+
+ def close(self, *args, **kwargs):
+ # type: (*Any, **Any) -> None
+ return None
+
+ def flush(self, *args, **kwargs):
+ # type: (*Any, **Any) -> None
+ return None
+
+ def __enter__(self):
+ # type: () -> BaseClient
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ # type: (Any, Any, Any) -> None
+ return None
+
+
+class NonRecordingClient(BaseClient):
+ """
+ .. versionadded:: 2.0.0
+
+ A client that does not send any events to Sentry. This is used as a fallback when the Sentry SDK is not yet initialized.
+ """
+
+ pass
+
+
+class _Client(BaseClient):
+ """
+ The client is internally responsible for capturing the events and
+ forwarding them to sentry through the configured transport. It takes
+ the client options as keyword arguments and optionally the DSN as first
+ argument.
+
+ Alias of :py:class:`sentry_sdk.Client`. (Was created for better intelisense support)
+ """
+
+ def __init__(self, *args, **kwargs):
+ # type: (*Any, **Any) -> None
+ super(_Client, self).__init__(options=get_options(*args, **kwargs))
+ self._init_impl()
+
+ def __getstate__(self):
+ # type: () -> Any
+ return {"options": self.options}
+
+ def __setstate__(self, state):
+ # type: (Any) -> None
+ self.options = state["options"]
+ self._init_impl()
+
+ def _setup_instrumentation(self, functions_to_trace):
+ # type: (Sequence[Dict[str, str]]) -> None
+ """
+ Instruments the functions given in the list `functions_to_trace` with the `@sentry_sdk.tracing.trace` decorator.
+ """
+ for function in functions_to_trace:
+ class_name = None
+ function_qualname = function["qualified_name"]
+ module_name, function_name = function_qualname.rsplit(".", 1)
+
+ try:
+ # Try to import module and function
+ # ex: "mymodule.submodule.funcname"
+
+ module_obj = import_module(module_name)
+ function_obj = getattr(module_obj, function_name)
+ setattr(module_obj, function_name, trace(function_obj))
+ logger.debug("Enabled tracing for %s", function_qualname)
+ except module_not_found_error:
+ try:
+ # Try to import a class
+ # ex: "mymodule.submodule.MyClassName.member_function"
+
+ module_name, class_name = module_name.rsplit(".", 1)
+ module_obj = import_module(module_name)
+ class_obj = getattr(module_obj, class_name)
+ function_obj = getattr(class_obj, function_name)
+ function_type = type(class_obj.__dict__[function_name])
+ traced_function = trace(function_obj)
+
+ if function_type in (staticmethod, classmethod):
+ traced_function = staticmethod(traced_function)
+
+ setattr(class_obj, function_name, traced_function)
+ setattr(module_obj, class_name, class_obj)
+ logger.debug("Enabled tracing for %s", function_qualname)
+
+ except Exception as e:
+ logger.warning(
+ "Can not enable tracing for '%s'. (%s) Please check your `functions_to_trace` parameter.",
+ function_qualname,
+ e,
+ )
+
+ except Exception as e:
+ logger.warning(
+ "Can not enable tracing for '%s'. (%s) Please check your `functions_to_trace` parameter.",
+ function_qualname,
+ e,
+ )
+
+ def _init_impl(self):
+ # type: () -> None
+ old_debug = _client_init_debug.get(False)
+
+ def _capture_envelope(envelope):
+ # type: (Envelope) -> None
+ if self.transport is not None:
+ self.transport.capture_envelope(envelope)
+
+ try:
+ _client_init_debug.set(self.options["debug"])
+ self.transport = make_transport(self.options)
+
+ self.monitor = None
+ if self.transport:
+ if self.options["enable_backpressure_handling"]:
+ self.monitor = Monitor(self.transport)
+
+ self.session_flusher = SessionFlusher(capture_func=_capture_envelope)
+
+ self.metrics_aggregator = None # type: Optional[MetricsAggregator]
+ experiments = self.options.get("_experiments", {})
+ if experiments.get("enable_metrics", True):
+ # Context vars are not working correctly on Python <=3.6
+ # with gevent.
+ metrics_supported = not is_gevent() or PY37
+ if metrics_supported:
+ from sentry_sdk.metrics import MetricsAggregator
+
+ self.metrics_aggregator = MetricsAggregator(
+ capture_func=_capture_envelope,
+ enable_code_locations=bool(
+ experiments.get("metric_code_locations", True)
+ ),
+ )
+ else:
+ logger.info(
+ "Metrics not supported on Python 3.6 and lower with gevent."
+ )
+
+ max_request_body_size = ("always", "never", "small", "medium")
+ if self.options["max_request_body_size"] not in max_request_body_size:
+ raise ValueError(
+ "Invalid value for max_request_body_size. Must be one of {}".format(
+ max_request_body_size
+ )
+ )
+
+ if self.options["_experiments"].get("otel_powered_performance", False):
+ logger.debug(
+ "[OTel] Enabling experimental OTel-powered performance monitoring."
+ )
+ self.options["instrumenter"] = INSTRUMENTER.OTEL
+ if (
+ "sentry_sdk.integrations.opentelemetry.integration.OpenTelemetryIntegration"
+ not in _DEFAULT_INTEGRATIONS
+ ):
+ _DEFAULT_INTEGRATIONS.append(
+ "sentry_sdk.integrations.opentelemetry.integration.OpenTelemetryIntegration",
+ )
+
+ self.integrations = setup_integrations(
+ self.options["integrations"],
+ with_defaults=self.options["default_integrations"],
+ with_auto_enabling_integrations=self.options[
+ "auto_enabling_integrations"
+ ],
+ disabled_integrations=self.options["disabled_integrations"],
+ )
+
+ spotlight_config = self.options.get("spotlight")
+ if spotlight_config is None and "SENTRY_SPOTLIGHT" in os.environ:
+ spotlight_env_value = os.environ["SENTRY_SPOTLIGHT"]
+ spotlight_config = env_to_bool(spotlight_env_value, strict=True)
+ self.options["spotlight"] = (
+ spotlight_config
+ if spotlight_config is not None
+ else spotlight_env_value
+ )
+
+ if self.options.get("spotlight"):
+ self.spotlight = setup_spotlight(self.options)
+
+ sdk_name = get_sdk_name(list(self.integrations.keys()))
+ SDK_INFO["name"] = sdk_name
+ logger.debug("Setting SDK name to '%s'", sdk_name)
+
+ if has_profiling_enabled(self.options):
+ try:
+ setup_profiler(self.options)
+ except Exception as e:
+ logger.debug("Can not set up profiler. (%s)", e)
+ else:
+ try:
+ setup_continuous_profiler(
+ self.options,
+ sdk_info=SDK_INFO,
+ capture_func=_capture_envelope,
+ )
+ except Exception as e:
+ logger.debug("Can not set up continuous profiler. (%s)", e)
+
+ finally:
+ _client_init_debug.set(old_debug)
+
+ self._setup_instrumentation(self.options.get("functions_to_trace", []))
+
+ if (
+ self.monitor
+ or self.metrics_aggregator
+ or has_profiling_enabled(self.options)
+ or isinstance(self.transport, BaseHttpTransport)
+ ):
+ # If we have anything on that could spawn a background thread, we
+ # need to check if it's safe to use them.
+ check_uwsgi_thread_support()
+
+ def is_active(self):
+ # type: () -> bool
+ """
+ .. versionadded:: 2.0.0
+
+ Returns whether the client is active (able to send data to Sentry)
+ """
+ return True
+
+ def should_send_default_pii(self):
+ # type: () -> bool
+ """
+ .. versionadded:: 2.0.0
+
+ Returns whether the client should send default PII (Personally Identifiable Information) data to Sentry.
+ """
+ result = self.options.get("send_default_pii")
+ if result is None:
+ result = not self.options["dsn"] and self.spotlight is not None
+
+ return result
+
+ @property
+ def dsn(self):
+ # type: () -> Optional[str]
+ """Returns the configured DSN as string."""
+ return self.options["dsn"]
+
+ def _prepare_event(
+ self,
+ event, # type: Event
+ hint, # type: Hint
+ scope, # type: Optional[Scope]
+ ):
+ # type: (...) -> Optional[Event]
+
+ previous_total_spans = None # type: Optional[int]
+
+ if event.get("timestamp") is None:
+ event["timestamp"] = datetime.now(timezone.utc)
+
+ if scope is not None:
+ is_transaction = event.get("type") == "transaction"
+ spans_before = len(cast(List[Dict[str, object]], event.get("spans", [])))
+ event_ = scope.apply_to_event(event, hint, self.options)
+
+ # one of the event/error processors returned None
+ if event_ is None:
+ if self.transport:
+ self.transport.record_lost_event(
+ "event_processor",
+ data_category=("transaction" if is_transaction else "error"),
+ )
+ if is_transaction:
+ self.transport.record_lost_event(
+ "event_processor",
+ data_category="span",
+ quantity=spans_before + 1, # +1 for the transaction itself
+ )
+ return None
+
+ event = event_
+ spans_delta = spans_before - len(
+ cast(List[Dict[str, object]], event.get("spans", []))
+ )
+ if is_transaction and spans_delta > 0 and self.transport is not None:
+ self.transport.record_lost_event(
+ "event_processor", data_category="span", quantity=spans_delta
+ )
+
+ dropped_spans = event.pop("_dropped_spans", 0) + spans_delta # type: int
+ if dropped_spans > 0:
+ previous_total_spans = spans_before + dropped_spans
+
+ if (
+ self.options["attach_stacktrace"]
+ and "exception" not in event
+ and "stacktrace" not in event
+ and "threads" not in event
+ ):
+ with capture_internal_exceptions():
+ event["threads"] = {
+ "values": [
+ {
+ "stacktrace": current_stacktrace(
+ include_local_variables=self.options.get(
+ "include_local_variables", True
+ ),
+ max_value_length=self.options.get(
+ "max_value_length", DEFAULT_MAX_VALUE_LENGTH
+ ),
+ ),
+ "crashed": False,
+ "current": True,
+ }
+ ]
+ }
+
+ for key in "release", "environment", "server_name", "dist":
+ if event.get(key) is None and self.options[key] is not None:
+ event[key] = str(self.options[key]).strip()
+ if event.get("sdk") is None:
+ sdk_info = dict(SDK_INFO)
+ sdk_info["integrations"] = sorted(self.integrations.keys())
+ event["sdk"] = sdk_info
+
+ if event.get("platform") is None:
+ event["platform"] = "python"
+
+ event = handle_in_app(
+ event,
+ self.options["in_app_exclude"],
+ self.options["in_app_include"],
+ self.options["project_root"],
+ )
+
+ if event is not None:
+ event_scrubber = self.options["event_scrubber"]
+ if event_scrubber:
+ event_scrubber.scrub_event(event)
+
+ if previous_total_spans is not None:
+ event["spans"] = AnnotatedValue(
+ event.get("spans", []), {"len": previous_total_spans}
+ )
+
+ # Postprocess the event here so that annotated types do
+ # generally not surface in before_send
+ if event is not None:
+ event = cast(
+ "Event",
+ serialize(
+ cast("Dict[str, Any]", event),
+ max_request_body_size=self.options.get("max_request_body_size"),
+ max_value_length=self.options.get("max_value_length"),
+ custom_repr=self.options.get("custom_repr"),
+ ),
+ )
+
+ before_send = self.options["before_send"]
+ if (
+ before_send is not None
+ and event is not None
+ and event.get("type") != "transaction"
+ ):
+ new_event = None
+ with capture_internal_exceptions():
+ new_event = before_send(event, hint or {})
+ if new_event is None:
+ logger.info("before send dropped event")
+ if self.transport:
+ self.transport.record_lost_event(
+ "before_send", data_category="error"
+ )
+
+ # If this is an exception, reset the DedupeIntegration. It still
+ # remembers the dropped exception as the last exception, meaning
+ # that if the same exception happens again and is not dropped
+ # in before_send, it'd get dropped by DedupeIntegration.
+ if event.get("exception"):
+ DedupeIntegration.reset_last_seen()
+
+ event = new_event
+
+ before_send_transaction = self.options["before_send_transaction"]
+ if (
+ before_send_transaction is not None
+ and event is not None
+ and event.get("type") == "transaction"
+ ):
+ new_event = None
+ spans_before = len(cast(List[Dict[str, object]], event.get("spans", [])))
+ with capture_internal_exceptions():
+ new_event = before_send_transaction(event, hint or {})
+ if new_event is None:
+ logger.info("before send transaction dropped event")
+ if self.transport:
+ self.transport.record_lost_event(
+ reason="before_send", data_category="transaction"
+ )
+ self.transport.record_lost_event(
+ reason="before_send",
+ data_category="span",
+ quantity=spans_before + 1, # +1 for the transaction itself
+ )
+ else:
+ spans_delta = spans_before - len(new_event.get("spans", []))
+ if spans_delta > 0 and self.transport is not None:
+ self.transport.record_lost_event(
+ reason="before_send", data_category="span", quantity=spans_delta
+ )
+
+ event = new_event
+
+ return event
+
+ def _is_ignored_error(self, event, hint):
+ # type: (Event, Hint) -> bool
+ exc_info = hint.get("exc_info")
+ if exc_info is None:
+ return False
+
+ error = exc_info[0]
+ error_type_name = get_type_name(exc_info[0])
+ error_full_name = "%s.%s" % (exc_info[0].__module__, error_type_name)
+
+ for ignored_error in self.options["ignore_errors"]:
+ # String types are matched against the type name in the
+ # exception only
+ if isinstance(ignored_error, str):
+ if ignored_error == error_full_name or ignored_error == error_type_name:
+ return True
+ else:
+ if issubclass(error, ignored_error):
+ return True
+
+ return False
+
+ def _should_capture(
+ self,
+ event, # type: Event
+ hint, # type: Hint
+ scope=None, # type: Optional[Scope]
+ ):
+ # type: (...) -> bool
+ # Transactions are sampled independent of error events.
+ is_transaction = event.get("type") == "transaction"
+ if is_transaction:
+ return True
+
+ ignoring_prevents_recursion = scope is not None and not scope._should_capture
+ if ignoring_prevents_recursion:
+ return False
+
+ ignored_by_config_option = self._is_ignored_error(event, hint)
+ if ignored_by_config_option:
+ return False
+
+ return True
+
+ def _should_sample_error(
+ self,
+ event, # type: Event
+ hint, # type: Hint
+ ):
+ # type: (...) -> bool
+ error_sampler = self.options.get("error_sampler", None)
+
+ if callable(error_sampler):
+ with capture_internal_exceptions():
+ sample_rate = error_sampler(event, hint)
+ else:
+ sample_rate = self.options["sample_rate"]
+
+ try:
+ not_in_sample_rate = sample_rate < 1.0 and random.random() >= sample_rate
+ except NameError:
+ logger.warning(
+ "The provided error_sampler raised an error. Defaulting to sampling the event."
+ )
+
+ # If the error_sampler raised an error, we should sample the event, since the default behavior
+ # (when no sample_rate or error_sampler is provided) is to sample all events.
+ not_in_sample_rate = False
+ except TypeError:
+ parameter, verb = (
+ ("error_sampler", "returned")
+ if callable(error_sampler)
+ else ("sample_rate", "contains")
+ )
+ logger.warning(
+ "The provided %s %s an invalid value of %s. The value should be a float or a bool. Defaulting to sampling the event."
+ % (parameter, verb, repr(sample_rate))
+ )
+
+ # If the sample_rate has an invalid value, we should sample the event, since the default behavior
+ # (when no sample_rate or error_sampler is provided) is to sample all events.
+ not_in_sample_rate = False
+
+ if not_in_sample_rate:
+ # because we will not sample this event, record a "lost event".
+ if self.transport:
+ self.transport.record_lost_event("sample_rate", data_category="error")
+
+ return False
+
+ return True
+
+ def _update_session_from_event(
+ self,
+ session, # type: Session
+ event, # type: Event
+ ):
+ # type: (...) -> None
+
+ crashed = False
+ errored = False
+ user_agent = None
+
+ exceptions = (event.get("exception") or {}).get("values")
+ if exceptions:
+ errored = True
+ for error in exceptions:
+ mechanism = error.get("mechanism")
+ if isinstance(mechanism, Mapping) and mechanism.get("handled") is False:
+ crashed = True
+ break
+
+ user = event.get("user")
+
+ if session.user_agent is None:
+ headers = (event.get("request") or {}).get("headers")
+ headers_dict = headers if isinstance(headers, dict) else {}
+ for k, v in headers_dict.items():
+ if k.lower() == "user-agent":
+ user_agent = v
+ break
+
+ session.update(
+ status="crashed" if crashed else None,
+ user=user,
+ user_agent=user_agent,
+ errors=session.errors + (errored or crashed),
+ )
+
+ def capture_event(
+ self,
+ event, # type: Event
+ hint=None, # type: Optional[Hint]
+ scope=None, # type: Optional[Scope]
+ ):
+ # type: (...) -> Optional[str]
+ """Captures an event.
+
+ :param event: A ready-made event that can be directly sent to Sentry.
+
+ :param hint: Contains metadata about the event that can be read from `before_send`, such as the original exception object or a HTTP request object.
+
+ :param scope: An optional :py:class:`sentry_sdk.Scope` to apply to events.
+
+ :returns: An event ID. May be `None` if there is no DSN set or of if the SDK decided to discard the event for other reasons. In such situations setting `debug=True` on `init()` may help.
+ """
+ hint = dict(hint or ()) # type: Hint
+
+ if not self._should_capture(event, hint, scope):
+ return None
+
+ profile = event.pop("profile", None)
+
+ event_id = event.get("event_id")
+ if event_id is None:
+ event["event_id"] = event_id = uuid.uuid4().hex
+ event_opt = self._prepare_event(event, hint, scope)
+ if event_opt is None:
+ return None
+
+ # whenever we capture an event we also check if the session needs
+ # to be updated based on that information.
+ session = scope._session if scope else None
+ if session:
+ self._update_session_from_event(session, event)
+
+ is_transaction = event_opt.get("type") == "transaction"
+ is_checkin = event_opt.get("type") == "check_in"
+
+ if (
+ not is_transaction
+ and not is_checkin
+ and not self._should_sample_error(event, hint)
+ ):
+ return None
+
+ attachments = hint.get("attachments")
+
+ trace_context = event_opt.get("contexts", {}).get("trace") or {}
+ dynamic_sampling_context = trace_context.pop("dynamic_sampling_context", {})
+
+ headers = {
+ "event_id": event_opt["event_id"],
+ "sent_at": format_timestamp(datetime.now(timezone.utc)),
+ } # type: dict[str, object]
+
+ if dynamic_sampling_context:
+ headers["trace"] = dynamic_sampling_context
+
+ envelope = Envelope(headers=headers)
+
+ if is_transaction:
+ if isinstance(profile, Profile):
+ envelope.add_profile(profile.to_json(event_opt, self.options))
+ envelope.add_transaction(event_opt)
+ elif is_checkin:
+ envelope.add_checkin(event_opt)
+ else:
+ envelope.add_event(event_opt)
+
+ for attachment in attachments or ():
+ envelope.add_item(attachment.to_envelope_item())
+
+ return_value = None
+ if self.spotlight:
+ self.spotlight.capture_envelope(envelope)
+ return_value = event_id
+
+ if self.transport is not None:
+ self.transport.capture_envelope(envelope)
+ return_value = event_id
+
+ return return_value
+
+ def capture_log(self, scope, severity_text, severity_number, template, **kwargs):
+ # type: (Scope, str, int, str, **Any) -> None
+ logs_enabled = self.options["_experiments"].get("enable_sentry_logs", False)
+ if not logs_enabled:
+ return
+
+ headers = {
+ "sent_at": format_timestamp(datetime.now(timezone.utc)),
+ } # type: dict[str, object]
+
+ attrs = {
+ "sentry.message.template": template,
+ } # type: dict[str, str | bool | float | int]
+
+ kwargs_attributes = kwargs.get("attributes")
+ if kwargs_attributes is not None:
+ attrs.update(kwargs_attributes)
+
+ environment = self.options.get("environment")
+ if environment is not None:
+ attrs["sentry.environment"] = environment
+
+ release = self.options.get("release")
+ if release is not None:
+ attrs["sentry.release"] = release
+
+ span = scope.span
+ if span is not None:
+ attrs["sentry.trace.parent_span_id"] = span.span_id
+
+ for k, v in kwargs.items():
+ attrs[f"sentry.message.parameters.{k}"] = v
+
+ log = {
+ "severity_text": severity_text,
+ "severity_number": severity_number,
+ "body": template.format(**kwargs),
+ "attributes": attrs,
+ "time_unix_nano": time.time_ns(),
+ "trace_id": None,
+ } # type: Log
+
+ # If debug is enabled, log the log to the console
+ debug = self.options.get("debug", False)
+ if debug:
+ severity_text_to_logging_level = {
+ "trace": logging.DEBUG,
+ "debug": logging.DEBUG,
+ "info": logging.INFO,
+ "warn": logging.WARNING,
+ "error": logging.ERROR,
+ "fatal": logging.CRITICAL,
+ }
+ logger.log(
+ severity_text_to_logging_level.get(severity_text, logging.DEBUG),
+ f'[Sentry Logs] {log["body"]}',
+ )
+
+ propagation_context = scope.get_active_propagation_context()
+ if propagation_context is not None:
+ headers["trace_id"] = propagation_context.trace_id
+ log["trace_id"] = propagation_context.trace_id
+
+ envelope = Envelope(headers=headers)
+
+ before_emit_log = self.options["_experiments"].get("before_emit_log")
+ if before_emit_log is not None:
+ log = before_emit_log(log, {})
+ if log is None:
+ return
+
+ def format_attribute(key, val):
+ # type: (str, int | float | str | bool) -> Any
+ if isinstance(val, bool):
+ return {"key": key, "value": {"boolValue": val}}
+ if isinstance(val, int):
+ return {"key": key, "value": {"intValue": str(val)}}
+ if isinstance(val, float):
+ return {"key": key, "value": {"doubleValue": val}}
+ if isinstance(val, str):
+ return {"key": key, "value": {"stringValue": val}}
+ return {"key": key, "value": {"stringValue": json.dumps(val)}}
+
+ otel_log = {
+ "severityText": log["severity_text"],
+ "severityNumber": log["severity_number"],
+ "body": {"stringValue": log["body"]},
+ "timeUnixNano": str(log["time_unix_nano"]),
+ "attributes": [
+ format_attribute(k, v) for (k, v) in log["attributes"].items()
+ ],
+ }
+
+ if "trace_id" in log:
+ otel_log["traceId"] = log["trace_id"]
+
+ envelope.add_log(otel_log) # TODO: batch these
+
+ if self.spotlight:
+ self.spotlight.capture_envelope(envelope)
+
+ if self.transport is not None:
+ self.transport.capture_envelope(envelope)
+
+ def capture_session(
+ self, session # type: Session
+ ):
+ # type: (...) -> None
+ if not session.release:
+ logger.info("Discarded session update because of missing release")
+ else:
+ self.session_flusher.add_session(session)
+
+ if TYPE_CHECKING:
+
+ @overload
+ def get_integration(self, name_or_class):
+ # type: (str) -> Optional[Integration]
+ ...
+
+ @overload
+ def get_integration(self, name_or_class):
+ # type: (type[I]) -> Optional[I]
+ ...
+
+ def get_integration(
+ self, name_or_class # type: Union[str, Type[Integration]]
+ ):
+ # type: (...) -> Optional[Integration]
+ """Returns the integration for this client by name or class.
+ If the client does not have that integration then `None` is returned.
+ """
+ if isinstance(name_or_class, str):
+ integration_name = name_or_class
+ elif name_or_class.identifier is not None:
+ integration_name = name_or_class.identifier
+ else:
+ raise ValueError("Integration has no name")
+
+ return self.integrations.get(integration_name)
+
+ def close(
+ self,
+ timeout=None, # type: Optional[float]
+ callback=None, # type: Optional[Callable[[int, float], None]]
+ ):
+ # type: (...) -> None
+ """
+ Close the client and shut down the transport. Arguments have the same
+ semantics as :py:meth:`Client.flush`.
+ """
+ if self.transport is not None:
+ self.flush(timeout=timeout, callback=callback)
+ self.session_flusher.kill()
+ if self.metrics_aggregator is not None:
+ self.metrics_aggregator.kill()
+ if self.monitor:
+ self.monitor.kill()
+ self.transport.kill()
+ self.transport = None
+
+ def flush(
+ self,
+ timeout=None, # type: Optional[float]
+ callback=None, # type: Optional[Callable[[int, float], None]]
+ ):
+ # type: (...) -> None
+ """
+ Wait for the current events to be sent.
+
+ :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
+
+ :param callback: Is invoked with the number of pending events and the configured timeout.
+ """
+ if self.transport is not None:
+ if timeout is None:
+ timeout = self.options["shutdown_timeout"]
+ self.session_flusher.flush()
+ if self.metrics_aggregator is not None:
+ self.metrics_aggregator.flush()
+ self.transport.flush(timeout=timeout, callback=callback)
+
+ def __enter__(self):
+ # type: () -> _Client
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ # type: (Any, Any, Any) -> None
+ self.close()
+
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ # Make mypy, PyCharm and other static analyzers think `get_options` is a
+ # type to have nicer autocompletion for params.
+ #
+ # Use `ClientConstructor` to define the argument types of `init` and
+ # `Dict[str, Any]` to tell static analyzers about the return type.
+
+ class get_options(ClientConstructor, Dict[str, Any]): # noqa: N801
+ pass
+
+ class Client(ClientConstructor, _Client):
+ pass
+
+else:
+ # Alias `get_options` for actual usage. Go through the lambda indirection
+ # to throw PyCharm off of the weakly typed signature (it would otherwise
+ # discover both the weakly typed signature of `_init` and our faked `init`
+ # type).
+
+ get_options = (lambda: _get_options)()
+ Client = (lambda: _Client)()