diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery')
3 files changed, 864 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py new file mode 100644 index 00000000..e8811d76 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py @@ -0,0 +1,528 @@ +import sys +from collections.abc import Mapping +from functools import wraps + +import sentry_sdk +from sentry_sdk import isolation_scope +from sentry_sdk.api import continue_trace +from sentry_sdk.consts import OP, SPANSTATUS, SPANDATA +from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable +from sentry_sdk.integrations.celery.beat import ( + _patch_beat_apply_entry, + _patch_redbeat_maybe_due, + _setup_celery_beat_signals, +) +from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch +from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TransactionSource +from sentry_sdk.tracing_utils import Baggage +from sentry_sdk.utils import ( + capture_internal_exceptions, + ensure_integration_enabled, + event_from_exception, + reraise, +) + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any + from typing import Callable + from typing import List + from typing import Optional + from typing import TypeVar + from typing import Union + + from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo + from sentry_sdk.tracing import Span + + F = TypeVar("F", bound=Callable[..., Any]) + + +try: + from celery import VERSION as CELERY_VERSION # type: ignore + from celery.app.task import Task # type: ignore + from celery.app.trace import task_has_custom + from celery.exceptions import ( # type: ignore + Ignore, + Reject, + Retry, + SoftTimeLimitExceeded, + ) + from kombu import Producer # type: ignore +except ImportError: + raise DidNotEnable("Celery not installed") + + +CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject) + + +class CeleryIntegration(Integration): + identifier = "celery" + origin = f"auto.queue.{identifier}" + + def __init__( + self, + propagate_traces=True, + monitor_beat_tasks=False, + exclude_beat_tasks=None, + ): + # type: (bool, bool, Optional[List[str]]) -> None + self.propagate_traces = propagate_traces + self.monitor_beat_tasks = monitor_beat_tasks + self.exclude_beat_tasks = exclude_beat_tasks + + _patch_beat_apply_entry() + _patch_redbeat_maybe_due() + _setup_celery_beat_signals(monitor_beat_tasks) + + @staticmethod + def setup_once(): + # type: () -> None + _check_minimum_version(CeleryIntegration, CELERY_VERSION) + + _patch_build_tracer() + _patch_task_apply_async() + _patch_celery_send_task() + _patch_worker_exit() + _patch_producer_publish() + + # This logger logs every status of every task that ran on the worker. + # Meaning that every task's breadcrumbs are full of stuff like "Task + # <foo> raised unexpected <bar>". + ignore_logger("celery.worker.job") + ignore_logger("celery.app.trace") + + # This is stdout/err redirected to a logger, can't deal with this + # (need event_level=logging.WARN to reproduce) + ignore_logger("celery.redirected") + + +def _set_status(status): + # type: (str) -> None + with capture_internal_exceptions(): + scope = sentry_sdk.get_current_scope() + if scope.span is not None: + scope.span.set_status(status) + + +def _capture_exception(task, exc_info): + # type: (Any, ExcInfo) -> None + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return + + if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS): + # ??? Doesn't map to anything + _set_status("aborted") + return + + _set_status("internal_error") + + if hasattr(task, "throws") and isinstance(exc_info[1], task.throws): + return + + event, hint = event_from_exception( + exc_info, + client_options=client.options, + mechanism={"type": "celery", "handled": False}, + ) + + sentry_sdk.capture_event(event, hint=hint) + + +def _make_event_processor(task, uuid, args, kwargs, request=None): + # type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor + def event_processor(event, hint): + # type: (Event, Hint) -> Optional[Event] + + with capture_internal_exceptions(): + tags = event.setdefault("tags", {}) + tags["celery_task_id"] = uuid + extra = event.setdefault("extra", {}) + extra["celery-job"] = { + "task_name": task.name, + "args": args, + "kwargs": kwargs, + } + + if "exc_info" in hint: + with capture_internal_exceptions(): + if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded): + event["fingerprint"] = [ + "celery", + "SoftTimeLimitExceeded", + getattr(task, "name", task), + ] + + return event + + return event_processor + + +def _update_celery_task_headers(original_headers, span, monitor_beat_tasks): + # type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any] + """ + Updates the headers of the Celery task with the tracing information + and eventually Sentry Crons monitoring information for beat tasks. + """ + updated_headers = original_headers.copy() + with capture_internal_exceptions(): + # if span is None (when the task was started by Celery Beat) + # this will return the trace headers from the scope. + headers = dict( + sentry_sdk.get_isolation_scope().iter_trace_propagation_headers(span=span) + ) + + if monitor_beat_tasks: + headers.update( + { + "sentry-monitor-start-timestamp-s": "%.9f" + % _now_seconds_since_epoch(), + } + ) + + # Add the time the task was enqueued to the headers + # This is used in the consumer to calculate the latency + updated_headers.update( + {"sentry-task-enqueued-time": _now_seconds_since_epoch()} + ) + + if headers: + existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME) + sentry_baggage = headers.get(BAGGAGE_HEADER_NAME) + + combined_baggage = sentry_baggage or existing_baggage + if sentry_baggage and existing_baggage: + # Merge incoming and sentry baggage, where the sentry trace information + # in the incoming baggage takes precedence and the third-party items + # are concatenated. + incoming = Baggage.from_incoming_header(existing_baggage) + combined = Baggage.from_incoming_header(sentry_baggage) + combined.sentry_items.update(incoming.sentry_items) + combined.third_party_items = ",".join( + [ + x + for x in [ + combined.third_party_items, + incoming.third_party_items, + ] + if x is not None and x != "" + ] + ) + combined_baggage = combined.serialize(include_third_party=True) + + updated_headers.update(headers) + if combined_baggage: + updated_headers[BAGGAGE_HEADER_NAME] = combined_baggage + + # https://github.com/celery/celery/issues/4875 + # + # Need to setdefault the inner headers too since other + # tracing tools (dd-trace-py) also employ this exact + # workaround and we don't want to break them. + updated_headers.setdefault("headers", {}).update(headers) + if combined_baggage: + updated_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage + + # Add the Sentry options potentially added in `sentry_apply_entry` + # to the headers (done when auto-instrumenting Celery Beat tasks) + for key, value in updated_headers.items(): + if key.startswith("sentry-"): + updated_headers["headers"][key] = value + + return updated_headers + + +class NoOpMgr: + def __enter__(self): + # type: () -> None + return None + + def __exit__(self, exc_type, exc_value, traceback): + # type: (Any, Any, Any) -> None + return None + + +def _wrap_task_run(f): + # type: (F) -> F + @wraps(f) + def apply_async(*args, **kwargs): + # type: (*Any, **Any) -> Any + # Note: kwargs can contain headers=None, so no setdefault! + # Unsure which backend though. + integration = sentry_sdk.get_client().get_integration(CeleryIntegration) + if integration is None: + return f(*args, **kwargs) + + kwarg_headers = kwargs.get("headers") or {} + propagate_traces = kwarg_headers.pop( + "sentry-propagate-traces", integration.propagate_traces + ) + + if not propagate_traces: + return f(*args, **kwargs) + + if isinstance(args[0], Task): + task_name = args[0].name # type: str + elif len(args) > 1 and isinstance(args[1], str): + task_name = args[1] + else: + task_name = "<unknown Celery task>" + + task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat" + + span_mgr = ( + sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_CELERY, + name=task_name, + origin=CeleryIntegration.origin, + ) + if not task_started_from_beat + else NoOpMgr() + ) # type: Union[Span, NoOpMgr] + + with span_mgr as span: + kwargs["headers"] = _update_celery_task_headers( + kwarg_headers, span, integration.monitor_beat_tasks + ) + return f(*args, **kwargs) + + return apply_async # type: ignore + + +def _wrap_tracer(task, f): + # type: (Any, F) -> F + + # Need to wrap tracer for pushing the scope before prerun is sent, and + # popping it after postrun is sent. + # + # This is the reason we don't use signals for hooking in the first place. + # Also because in Celery 3, signal dispatch returns early if one handler + # crashes. + @wraps(f) + @ensure_integration_enabled(CeleryIntegration, f) + def _inner(*args, **kwargs): + # type: (*Any, **Any) -> Any + with isolation_scope() as scope: + scope._name = "celery" + scope.clear_breadcrumbs() + scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) + + transaction = None + + # Celery task objects are not a thing to be trusted. Even + # something such as attribute access can fail. + with capture_internal_exceptions(): + headers = args[3].get("headers") or {} + transaction = continue_trace( + headers, + op=OP.QUEUE_TASK_CELERY, + name="unknown celery task", + source=TransactionSource.TASK, + origin=CeleryIntegration.origin, + ) + transaction.name = task.name + transaction.set_status(SPANSTATUS.OK) + + if transaction is None: + return f(*args, **kwargs) + + with sentry_sdk.start_transaction( + transaction, + custom_sampling_context={ + "celery_job": { + "task": task.name, + # for some reason, args[1] is a list if non-empty but a + # tuple if empty + "args": list(args[1]), + "kwargs": args[2], + } + }, + ): + return f(*args, **kwargs) + + return _inner # type: ignore + + +def _set_messaging_destination_name(task, span): + # type: (Any, Span) -> None + """Set "messaging.destination.name" tag for span""" + with capture_internal_exceptions(): + delivery_info = task.request.delivery_info + if delivery_info: + routing_key = delivery_info.get("routing_key") + if delivery_info.get("exchange") == "" and routing_key is not None: + # Empty exchange indicates the default exchange, meaning the tasks + # are sent to the queue with the same name as the routing key. + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + + +def _wrap_task_call(task, f): + # type: (Any, F) -> F + + # Need to wrap task call because the exception is caught before we get to + # see it. Also celery's reported stacktrace is untrustworthy. + + # functools.wraps is important here because celery-once looks at this + # method's name. @ensure_integration_enabled internally calls functools.wraps, + # but if we ever remove the @ensure_integration_enabled decorator, we need + # to add @functools.wraps(f) here. + # https://github.com/getsentry/sentry-python/issues/421 + @ensure_integration_enabled(CeleryIntegration, f) + def _inner(*args, **kwargs): + # type: (*Any, **Any) -> Any + try: + with sentry_sdk.start_span( + op=OP.QUEUE_PROCESS, + name=task.name, + origin=CeleryIntegration.origin, + ) as span: + _set_messaging_destination_name(task, span) + + latency = None + with capture_internal_exceptions(): + if ( + task.request.headers is not None + and "sentry-task-enqueued-time" in task.request.headers + ): + latency = _now_seconds_since_epoch() - task.request.headers.pop( + "sentry-task-enqueued-time" + ) + + if latency is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) + + with capture_internal_exceptions(): + span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) + + with capture_internal_exceptions(): + span.set_data( + SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries + ) + + with capture_internal_exceptions(): + span.set_data( + SPANDATA.MESSAGING_SYSTEM, + task.app.connection().transport.driver_type, + ) + + return f(*args, **kwargs) + except Exception: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + _capture_exception(task, exc_info) + reraise(*exc_info) + + return _inner # type: ignore + + +def _patch_build_tracer(): + # type: () -> None + import celery.app.trace as trace # type: ignore + + original_build_tracer = trace.build_tracer + + def sentry_build_tracer(name, task, *args, **kwargs): + # type: (Any, Any, *Any, **Any) -> Any + if not getattr(task, "_sentry_is_patched", False): + # determine whether Celery will use __call__ or run and patch + # accordingly + if task_has_custom(task, "__call__"): + type(task).__call__ = _wrap_task_call(task, type(task).__call__) + else: + task.run = _wrap_task_call(task, task.run) + + # `build_tracer` is apparently called for every task + # invocation. Can't wrap every celery task for every invocation + # or we will get infinitely nested wrapper functions. + task._sentry_is_patched = True + + return _wrap_tracer(task, original_build_tracer(name, task, *args, **kwargs)) + + trace.build_tracer = sentry_build_tracer + + +def _patch_task_apply_async(): + # type: () -> None + Task.apply_async = _wrap_task_run(Task.apply_async) + + +def _patch_celery_send_task(): + # type: () -> None + from celery import Celery + + Celery.send_task = _wrap_task_run(Celery.send_task) + + +def _patch_worker_exit(): + # type: () -> None + + # Need to flush queue before worker shutdown because a crashing worker will + # call os._exit + from billiard.pool import Worker # type: ignore + + original_workloop = Worker.workloop + + def sentry_workloop(*args, **kwargs): + # type: (*Any, **Any) -> Any + try: + return original_workloop(*args, **kwargs) + finally: + with capture_internal_exceptions(): + if ( + sentry_sdk.get_client().get_integration(CeleryIntegration) + is not None + ): + sentry_sdk.flush() + + Worker.workloop = sentry_workloop + + +def _patch_producer_publish(): + # type: () -> None + original_publish = Producer.publish + + @ensure_integration_enabled(CeleryIntegration, original_publish) + def sentry_publish(self, *args, **kwargs): + # type: (Producer, *Any, **Any) -> Any + kwargs_headers = kwargs.get("headers", {}) + if not isinstance(kwargs_headers, Mapping): + # Ensure kwargs_headers is a Mapping, so we can safely call get(). + # We don't expect this to happen, but it's better to be safe. Even + # if it does happen, only our instrumentation breaks. This line + # does not overwrite kwargs["headers"], so the original publish + # method will still work. + kwargs_headers = {} + + task_name = kwargs_headers.get("task") + task_id = kwargs_headers.get("id") + retries = kwargs_headers.get("retries") + + routing_key = kwargs.get("routing_key") + exchange = kwargs.get("exchange") + + with sentry_sdk.start_span( + op=OP.QUEUE_PUBLISH, + name=task_name, + origin=CeleryIntegration.origin, + ) as span: + if task_id is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) + + if exchange == "" and routing_key is not None: + # Empty exchange indicates the default exchange, meaning messages are + # routed to the queue with the same name as the routing key. + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + + if retries is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) + + with capture_internal_exceptions(): + span.set_data( + SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type + ) + + return original_publish(self, *args, **kwargs) + + Producer.publish = sentry_publish diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py new file mode 100644 index 00000000..ddbc8561 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py @@ -0,0 +1,293 @@ +import sentry_sdk +from sentry_sdk.crons import capture_checkin, MonitorStatus +from sentry_sdk.integrations import DidNotEnable +from sentry_sdk.integrations.celery.utils import ( + _get_humanized_interval, + _now_seconds_since_epoch, +) +from sentry_sdk.utils import ( + logger, + match_regex_list, +) + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import Any, Optional, TypeVar, Union + from sentry_sdk._types import ( + MonitorConfig, + MonitorConfigScheduleType, + MonitorConfigScheduleUnit, + ) + + F = TypeVar("F", bound=Callable[..., Any]) + + +try: + from celery import Task, Celery # type: ignore + from celery.beat import Scheduler # type: ignore + from celery.schedules import crontab, schedule # type: ignore + from celery.signals import ( # type: ignore + task_failure, + task_success, + task_retry, + ) +except ImportError: + raise DidNotEnable("Celery not installed") + +try: + from redbeat.schedulers import RedBeatScheduler # type: ignore +except ImportError: + RedBeatScheduler = None + + +def _get_headers(task): + # type: (Task) -> dict[str, Any] + headers = task.request.get("headers") or {} + + # flatten nested headers + if "headers" in headers: + headers.update(headers["headers"]) + del headers["headers"] + + headers.update(task.request.get("properties") or {}) + + return headers + + +def _get_monitor_config(celery_schedule, app, monitor_name): + # type: (Any, Celery, str) -> MonitorConfig + monitor_config = {} # type: MonitorConfig + schedule_type = None # type: Optional[MonitorConfigScheduleType] + schedule_value = None # type: Optional[Union[str, int]] + schedule_unit = None # type: Optional[MonitorConfigScheduleUnit] + + if isinstance(celery_schedule, crontab): + schedule_type = "crontab" + schedule_value = ( + "{0._orig_minute} " + "{0._orig_hour} " + "{0._orig_day_of_month} " + "{0._orig_month_of_year} " + "{0._orig_day_of_week}".format(celery_schedule) + ) + elif isinstance(celery_schedule, schedule): + schedule_type = "interval" + (schedule_value, schedule_unit) = _get_humanized_interval( + celery_schedule.seconds + ) + + if schedule_unit == "second": + logger.warning( + "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.", + monitor_name, + schedule_value, + ) + return {} + + else: + logger.warning( + "Celery schedule type '%s' not supported by Sentry Crons.", + type(celery_schedule), + ) + return {} + + monitor_config["schedule"] = {} + monitor_config["schedule"]["type"] = schedule_type + monitor_config["schedule"]["value"] = schedule_value + + if schedule_unit is not None: + monitor_config["schedule"]["unit"] = schedule_unit + + monitor_config["timezone"] = ( + ( + hasattr(celery_schedule, "tz") + and celery_schedule.tz is not None + and str(celery_schedule.tz) + ) + or app.timezone + or "UTC" + ) + + return monitor_config + + +def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration): + # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None + """ + Add Sentry Crons information to the schedule_entry headers. + """ + if not integration.monitor_beat_tasks: + return + + monitor_name = schedule_entry.name + + task_should_be_excluded = match_regex_list( + monitor_name, integration.exclude_beat_tasks + ) + if task_should_be_excluded: + return + + celery_schedule = schedule_entry.schedule + app = scheduler.app + + monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) + + is_supported_schedule = bool(monitor_config) + if not is_supported_schedule: + return + + headers = schedule_entry.options.pop("headers", {}) + headers.update( + { + "sentry-monitor-slug": monitor_name, + "sentry-monitor-config": monitor_config, + } + ) + + check_in_id = capture_checkin( + monitor_slug=monitor_name, + monitor_config=monitor_config, + status=MonitorStatus.IN_PROGRESS, + ) + headers.update({"sentry-monitor-check-in-id": check_in_id}) + + # Set the Sentry configuration in the options of the ScheduleEntry. + # Those will be picked up in `apply_async` and added to the headers. + schedule_entry.options["headers"] = headers + + +def _wrap_beat_scheduler(original_function): + # type: (Callable[..., Any]) -> Callable[..., Any] + """ + Makes sure that: + - a new Sentry trace is started for each task started by Celery Beat and + it is propagated to the task. + - the Sentry Crons information is set in the Celery Beat task's + headers so that is is monitored with Sentry Crons. + + After the patched function is called, + Celery Beat will call apply_async to put the task in the queue. + """ + # Patch only once + # Can't use __name__ here, because some of our tests mock original_apply_entry + already_patched = "sentry_patched_scheduler" in str(original_function) + if already_patched: + return original_function + + from sentry_sdk.integrations.celery import CeleryIntegration + + def sentry_patched_scheduler(*args, **kwargs): + # type: (*Any, **Any) -> None + integration = sentry_sdk.get_client().get_integration(CeleryIntegration) + if integration is None: + return original_function(*args, **kwargs) + + # Tasks started by Celery Beat start a new Trace + scope = sentry_sdk.get_isolation_scope() + scope.set_new_propagation_context() + scope._name = "celery-beat" + + scheduler, schedule_entry = args + _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration) + + return original_function(*args, **kwargs) + + return sentry_patched_scheduler + + +def _patch_beat_apply_entry(): + # type: () -> None + Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry) + + +def _patch_redbeat_maybe_due(): + # type: () -> None + if RedBeatScheduler is None: + return + + RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due) + + +def _setup_celery_beat_signals(monitor_beat_tasks): + # type: (bool) -> None + if monitor_beat_tasks: + task_success.connect(crons_task_success) + task_failure.connect(crons_task_failure) + task_retry.connect(crons_task_retry) + + +def crons_task_success(sender, **kwargs): + # type: (Task, dict[Any, Any]) -> None + logger.debug("celery_task_success %s", sender) + headers = _get_headers(sender) + + if "sentry-monitor-slug" not in headers: + return + + monitor_config = headers.get("sentry-monitor-config", {}) + + start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") + + capture_checkin( + monitor_slug=headers["sentry-monitor-slug"], + monitor_config=monitor_config, + check_in_id=headers["sentry-monitor-check-in-id"], + duration=( + _now_seconds_since_epoch() - float(start_timestamp_s) + if start_timestamp_s + else None + ), + status=MonitorStatus.OK, + ) + + +def crons_task_failure(sender, **kwargs): + # type: (Task, dict[Any, Any]) -> None + logger.debug("celery_task_failure %s", sender) + headers = _get_headers(sender) + + if "sentry-monitor-slug" not in headers: + return + + monitor_config = headers.get("sentry-monitor-config", {}) + + start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") + + capture_checkin( + monitor_slug=headers["sentry-monitor-slug"], + monitor_config=monitor_config, + check_in_id=headers["sentry-monitor-check-in-id"], + duration=( + _now_seconds_since_epoch() - float(start_timestamp_s) + if start_timestamp_s + else None + ), + status=MonitorStatus.ERROR, + ) + + +def crons_task_retry(sender, **kwargs): + # type: (Task, dict[Any, Any]) -> None + logger.debug("celery_task_retry %s", sender) + headers = _get_headers(sender) + + if "sentry-monitor-slug" not in headers: + return + + monitor_config = headers.get("sentry-monitor-config", {}) + + start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") + + capture_checkin( + monitor_slug=headers["sentry-monitor-slug"], + monitor_config=monitor_config, + check_in_id=headers["sentry-monitor-check-in-id"], + duration=( + _now_seconds_since_epoch() - float(start_timestamp_s) + if start_timestamp_s + else None + ), + status=MonitorStatus.ERROR, + ) diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py new file mode 100644 index 00000000..a1961b15 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py @@ -0,0 +1,43 @@ +import time +from typing import TYPE_CHECKING, cast + +if TYPE_CHECKING: + from typing import Any, Tuple + from sentry_sdk._types import MonitorConfigScheduleUnit + + +def _now_seconds_since_epoch(): + # type: () -> float + # We cannot use `time.perf_counter()` when dealing with the duration + # of a Celery task, because the start of a Celery task and + # the end are recorded in different processes. + # Start happens in the Celery Beat process, + # the end in a Celery Worker process. + return time.time() + + +def _get_humanized_interval(seconds): + # type: (float) -> Tuple[int, MonitorConfigScheduleUnit] + TIME_UNITS = ( # noqa: N806 + ("day", 60 * 60 * 24.0), + ("hour", 60 * 60.0), + ("minute", 60.0), + ) + + seconds = float(seconds) + for unit, divider in TIME_UNITS: + if seconds >= divider: + interval = int(seconds / divider) + return (interval, cast("MonitorConfigScheduleUnit", unit)) + + return (int(seconds), "second") + + +class NoOpMgr: + def __enter__(self): + # type: () -> None + return None + + def __exit__(self, exc_type, exc_value, traceback): + # type: (Any, Any, Any) -> None + return None |