diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/sessions.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sentry_sdk/sessions.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/sessions.py b/.venv/lib/python3.12/site-packages/sentry_sdk/sessions.py new file mode 100644 index 00000000..eaeb915e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/sessions.py @@ -0,0 +1,278 @@ +import os +import time +import warnings +from threading import Thread, Lock +from contextlib import contextmanager + +import sentry_sdk +from sentry_sdk.envelope import Envelope +from sentry_sdk.session import Session +from sentry_sdk.utils import format_timestamp + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any + from typing import Callable + from typing import Dict + from typing import Generator + from typing import List + from typing import Optional + from typing import Union + + +def is_auto_session_tracking_enabled(hub=None): + # type: (Optional[sentry_sdk.Hub]) -> Union[Any, bool, None] + """DEPRECATED: Utility function to find out if session tracking is enabled.""" + + # Internal callers should use private _is_auto_session_tracking_enabled, instead. + warnings.warn( + "This function is deprecated and will be removed in the next major release. " + "There is no public API replacement.", + DeprecationWarning, + stacklevel=2, + ) + + if hub is None: + hub = sentry_sdk.Hub.current + + should_track = hub.scope._force_auto_session_tracking + + if should_track is None: + client_options = hub.client.options if hub.client else {} + should_track = client_options.get("auto_session_tracking", False) + + return should_track + + +@contextmanager +def auto_session_tracking(hub=None, session_mode="application"): + # type: (Optional[sentry_sdk.Hub], str) -> Generator[None, None, None] + """DEPRECATED: Use track_session instead + Starts and stops a session automatically around a block. + """ + warnings.warn( + "This function is deprecated and will be removed in the next major release. " + "Use track_session instead.", + DeprecationWarning, + stacklevel=2, + ) + + if hub is None: + hub = sentry_sdk.Hub.current + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + should_track = is_auto_session_tracking_enabled(hub) + if should_track: + hub.start_session(session_mode=session_mode) + try: + yield + finally: + if should_track: + hub.end_session() + + +def is_auto_session_tracking_enabled_scope(scope): + # type: (sentry_sdk.Scope) -> bool + """ + DEPRECATED: Utility function to find out if session tracking is enabled. + """ + + warnings.warn( + "This function is deprecated and will be removed in the next major release. " + "There is no public API replacement.", + DeprecationWarning, + stacklevel=2, + ) + + # Internal callers should use private _is_auto_session_tracking_enabled, instead. + return _is_auto_session_tracking_enabled(scope) + + +def _is_auto_session_tracking_enabled(scope): + # type: (sentry_sdk.Scope) -> bool + """ + Utility function to find out if session tracking is enabled. + """ + + should_track = scope._force_auto_session_tracking + if should_track is None: + client_options = sentry_sdk.get_client().options + should_track = client_options.get("auto_session_tracking", False) + + return should_track + + +@contextmanager +def auto_session_tracking_scope(scope, session_mode="application"): + # type: (sentry_sdk.Scope, str) -> Generator[None, None, None] + """DEPRECATED: This function is a deprecated alias for track_session. + Starts and stops a session automatically around a block. + """ + + warnings.warn( + "This function is a deprecated alias for track_session and will be removed in the next major release.", + DeprecationWarning, + stacklevel=2, + ) + + with track_session(scope, session_mode=session_mode): + yield + + +@contextmanager +def track_session(scope, session_mode="application"): + # type: (sentry_sdk.Scope, str) -> Generator[None, None, None] + """ + Start a new session in the provided scope, assuming session tracking is enabled. + This is a no-op context manager if session tracking is not enabled. + """ + + should_track = _is_auto_session_tracking_enabled(scope) + if should_track: + scope.start_session(session_mode=session_mode) + try: + yield + finally: + if should_track: + scope.end_session() + + +TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed") +MAX_ENVELOPE_ITEMS = 100 + + +def make_aggregate_envelope(aggregate_states, attrs): + # type: (Any, Any) -> Any + return {"attrs": dict(attrs), "aggregates": list(aggregate_states.values())} + + +class SessionFlusher: + def __init__( + self, + capture_func, # type: Callable[[Envelope], None] + flush_interval=60, # type: int + ): + # type: (...) -> None + self.capture_func = capture_func + self.flush_interval = flush_interval + self.pending_sessions = [] # type: List[Any] + self.pending_aggregates = {} # type: Dict[Any, Any] + self._thread = None # type: Optional[Thread] + self._thread_lock = Lock() + self._aggregate_lock = Lock() + self._thread_for_pid = None # type: Optional[int] + self._running = True + + def flush(self): + # type: (...) -> None + pending_sessions = self.pending_sessions + self.pending_sessions = [] + + with self._aggregate_lock: + pending_aggregates = self.pending_aggregates + self.pending_aggregates = {} + + envelope = Envelope() + for session in pending_sessions: + if len(envelope.items) == MAX_ENVELOPE_ITEMS: + self.capture_func(envelope) + envelope = Envelope() + + envelope.add_session(session) + + for attrs, states in pending_aggregates.items(): + if len(envelope.items) == MAX_ENVELOPE_ITEMS: + self.capture_func(envelope) + envelope = Envelope() + + envelope.add_sessions(make_aggregate_envelope(states, attrs)) + + if len(envelope.items) > 0: + self.capture_func(envelope) + + def _ensure_running(self): + # type: (...) -> None + """ + Check that we have an active thread to run in, or create one if not. + + Note that this might fail (e.g. in Python 3.12 it's not possible to + spawn new threads at interpreter shutdown). In that case self._running + will be False after running this function. + """ + if self._thread_for_pid == os.getpid() and self._thread is not None: + return None + with self._thread_lock: + if self._thread_for_pid == os.getpid() and self._thread is not None: + return None + + def _thread(): + # type: (...) -> None + while self._running: + time.sleep(self.flush_interval) + if self._running: + self.flush() + + thread = Thread(target=_thread) + thread.daemon = True + try: + thread.start() + except RuntimeError: + # Unfortunately at this point the interpreter is in a state that no + # longer allows us to spawn a thread and we have to bail. + self._running = False + return None + + self._thread = thread + self._thread_for_pid = os.getpid() + + return None + + def add_aggregate_session( + self, session # type: Session + ): + # type: (...) -> None + # NOTE on `session.did`: + # the protocol can deal with buckets that have a distinct-id, however + # in practice we expect the python SDK to have an extremely high cardinality + # here, effectively making aggregation useless, therefore we do not + # aggregate per-did. + + # For this part we can get away with using the global interpreter lock + with self._aggregate_lock: + attrs = session.get_json_attrs(with_user_info=False) + primary_key = tuple(sorted(attrs.items())) + secondary_key = session.truncated_started # (, session.did) + states = self.pending_aggregates.setdefault(primary_key, {}) + state = states.setdefault(secondary_key, {}) + + if "started" not in state: + state["started"] = format_timestamp(session.truncated_started) + # if session.did is not None: + # state["did"] = session.did + if session.status == "crashed": + state["crashed"] = state.get("crashed", 0) + 1 + elif session.status == "abnormal": + state["abnormal"] = state.get("abnormal", 0) + 1 + elif session.errors > 0: + state["errored"] = state.get("errored", 0) + 1 + else: + state["exited"] = state.get("exited", 0) + 1 + + def add_session( + self, session # type: Session + ): + # type: (...) -> None + if session.session_mode == "request": + self.add_aggregate_session(session) + else: + self.pending_sessions.append(session.to_json()) + self._ensure_running() + + def kill(self): + # type: (...) -> None + self._running = False + + def __del__(self): + # type: (...) -> None + self.kill() |