about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py528
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py293
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py43
3 files changed, 864 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py
new file mode 100644
index 00000000..e8811d76
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/__init__.py
@@ -0,0 +1,528 @@
+import sys
+from collections.abc import Mapping
+from functools import wraps
+
+import sentry_sdk
+from sentry_sdk import isolation_scope
+from sentry_sdk.api import continue_trace
+from sentry_sdk.consts import OP, SPANSTATUS, SPANDATA
+from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
+from sentry_sdk.integrations.celery.beat import (
+    _patch_beat_apply_entry,
+    _patch_redbeat_maybe_due,
+    _setup_celery_beat_signals,
+)
+from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
+from sentry_sdk.integrations.logging import ignore_logger
+from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TransactionSource
+from sentry_sdk.tracing_utils import Baggage
+from sentry_sdk.utils import (
+    capture_internal_exceptions,
+    ensure_integration_enabled,
+    event_from_exception,
+    reraise,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Callable
+    from typing import List
+    from typing import Optional
+    from typing import TypeVar
+    from typing import Union
+
+    from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo
+    from sentry_sdk.tracing import Span
+
+    F = TypeVar("F", bound=Callable[..., Any])
+
+
+try:
+    from celery import VERSION as CELERY_VERSION  # type: ignore
+    from celery.app.task import Task  # type: ignore
+    from celery.app.trace import task_has_custom
+    from celery.exceptions import (  # type: ignore
+        Ignore,
+        Reject,
+        Retry,
+        SoftTimeLimitExceeded,
+    )
+    from kombu import Producer  # type: ignore
+except ImportError:
+    raise DidNotEnable("Celery not installed")
+
+
+CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)
+
+
+class CeleryIntegration(Integration):
+    identifier = "celery"
+    origin = f"auto.queue.{identifier}"
+
+    def __init__(
+        self,
+        propagate_traces=True,
+        monitor_beat_tasks=False,
+        exclude_beat_tasks=None,
+    ):
+        # type: (bool, bool, Optional[List[str]]) -> None
+        self.propagate_traces = propagate_traces
+        self.monitor_beat_tasks = monitor_beat_tasks
+        self.exclude_beat_tasks = exclude_beat_tasks
+
+        _patch_beat_apply_entry()
+        _patch_redbeat_maybe_due()
+        _setup_celery_beat_signals(monitor_beat_tasks)
+
+    @staticmethod
+    def setup_once():
+        # type: () -> None
+        _check_minimum_version(CeleryIntegration, CELERY_VERSION)
+
+        _patch_build_tracer()
+        _patch_task_apply_async()
+        _patch_celery_send_task()
+        _patch_worker_exit()
+        _patch_producer_publish()
+
+        # This logger logs every status of every task that ran on the worker.
+        # Meaning that every task's breadcrumbs are full of stuff like "Task
+        # <foo> raised unexpected <bar>".
+        ignore_logger("celery.worker.job")
+        ignore_logger("celery.app.trace")
+
+        # This is stdout/err redirected to a logger, can't deal with this
+        # (need event_level=logging.WARN to reproduce)
+        ignore_logger("celery.redirected")
+
+
+def _set_status(status):
+    # type: (str) -> None
+    with capture_internal_exceptions():
+        scope = sentry_sdk.get_current_scope()
+        if scope.span is not None:
+            scope.span.set_status(status)
+
+
+def _capture_exception(task, exc_info):
+    # type: (Any, ExcInfo) -> None
+    client = sentry_sdk.get_client()
+    if client.get_integration(CeleryIntegration) is None:
+        return
+
+    if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
+        # ??? Doesn't map to anything
+        _set_status("aborted")
+        return
+
+    _set_status("internal_error")
+
+    if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
+        return
+
+    event, hint = event_from_exception(
+        exc_info,
+        client_options=client.options,
+        mechanism={"type": "celery", "handled": False},
+    )
+
+    sentry_sdk.capture_event(event, hint=hint)
+
+
+def _make_event_processor(task, uuid, args, kwargs, request=None):
+    # type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor
+    def event_processor(event, hint):
+        # type: (Event, Hint) -> Optional[Event]
+
+        with capture_internal_exceptions():
+            tags = event.setdefault("tags", {})
+            tags["celery_task_id"] = uuid
+            extra = event.setdefault("extra", {})
+            extra["celery-job"] = {
+                "task_name": task.name,
+                "args": args,
+                "kwargs": kwargs,
+            }
+
+        if "exc_info" in hint:
+            with capture_internal_exceptions():
+                if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
+                    event["fingerprint"] = [
+                        "celery",
+                        "SoftTimeLimitExceeded",
+                        getattr(task, "name", task),
+                    ]
+
+        return event
+
+    return event_processor
+
+
+def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
+    # type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
+    """
+    Updates the headers of the Celery task with the tracing information
+    and eventually Sentry Crons monitoring information for beat tasks.
+    """
+    updated_headers = original_headers.copy()
+    with capture_internal_exceptions():
+        # if span is None (when the task was started by Celery Beat)
+        # this will return the trace headers from the scope.
+        headers = dict(
+            sentry_sdk.get_isolation_scope().iter_trace_propagation_headers(span=span)
+        )
+
+        if monitor_beat_tasks:
+            headers.update(
+                {
+                    "sentry-monitor-start-timestamp-s": "%.9f"
+                    % _now_seconds_since_epoch(),
+                }
+            )
+
+        # Add the time the task was enqueued to the headers
+        # This is used in the consumer to calculate the latency
+        updated_headers.update(
+            {"sentry-task-enqueued-time": _now_seconds_since_epoch()}
+        )
+
+        if headers:
+            existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME)
+            sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)
+
+            combined_baggage = sentry_baggage or existing_baggage
+            if sentry_baggage and existing_baggage:
+                # Merge incoming and sentry baggage, where the sentry trace information
+                # in the incoming baggage takes precedence and the third-party items
+                # are concatenated.
+                incoming = Baggage.from_incoming_header(existing_baggage)
+                combined = Baggage.from_incoming_header(sentry_baggage)
+                combined.sentry_items.update(incoming.sentry_items)
+                combined.third_party_items = ",".join(
+                    [
+                        x
+                        for x in [
+                            combined.third_party_items,
+                            incoming.third_party_items,
+                        ]
+                        if x is not None and x != ""
+                    ]
+                )
+                combined_baggage = combined.serialize(include_third_party=True)
+
+            updated_headers.update(headers)
+            if combined_baggage:
+                updated_headers[BAGGAGE_HEADER_NAME] = combined_baggage
+
+            # https://github.com/celery/celery/issues/4875
+            #
+            # Need to setdefault the inner headers too since other
+            # tracing tools (dd-trace-py) also employ this exact
+            # workaround and we don't want to break them.
+            updated_headers.setdefault("headers", {}).update(headers)
+            if combined_baggage:
+                updated_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage
+
+            # Add the Sentry options potentially added in `sentry_apply_entry`
+            # to the headers (done when auto-instrumenting Celery Beat tasks)
+            for key, value in updated_headers.items():
+                if key.startswith("sentry-"):
+                    updated_headers["headers"][key] = value
+
+    return updated_headers
+
+
+class NoOpMgr:
+    def __enter__(self):
+        # type: () -> None
+        return None
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        # type: (Any, Any, Any) -> None
+        return None
+
+
+def _wrap_task_run(f):
+    # type: (F) -> F
+    @wraps(f)
+    def apply_async(*args, **kwargs):
+        # type: (*Any, **Any) -> Any
+        # Note: kwargs can contain headers=None, so no setdefault!
+        # Unsure which backend though.
+        integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
+        if integration is None:
+            return f(*args, **kwargs)
+
+        kwarg_headers = kwargs.get("headers") or {}
+        propagate_traces = kwarg_headers.pop(
+            "sentry-propagate-traces", integration.propagate_traces
+        )
+
+        if not propagate_traces:
+            return f(*args, **kwargs)
+
+        if isinstance(args[0], Task):
+            task_name = args[0].name  # type: str
+        elif len(args) > 1 and isinstance(args[1], str):
+            task_name = args[1]
+        else:
+            task_name = "<unknown Celery task>"
+
+        task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"
+
+        span_mgr = (
+            sentry_sdk.start_span(
+                op=OP.QUEUE_SUBMIT_CELERY,
+                name=task_name,
+                origin=CeleryIntegration.origin,
+            )
+            if not task_started_from_beat
+            else NoOpMgr()
+        )  # type: Union[Span, NoOpMgr]
+
+        with span_mgr as span:
+            kwargs["headers"] = _update_celery_task_headers(
+                kwarg_headers, span, integration.monitor_beat_tasks
+            )
+            return f(*args, **kwargs)
+
+    return apply_async  # type: ignore
+
+
+def _wrap_tracer(task, f):
+    # type: (Any, F) -> F
+
+    # Need to wrap tracer for pushing the scope before prerun is sent, and
+    # popping it after postrun is sent.
+    #
+    # This is the reason we don't use signals for hooking in the first place.
+    # Also because in Celery 3, signal dispatch returns early if one handler
+    # crashes.
+    @wraps(f)
+    @ensure_integration_enabled(CeleryIntegration, f)
+    def _inner(*args, **kwargs):
+        # type: (*Any, **Any) -> Any
+        with isolation_scope() as scope:
+            scope._name = "celery"
+            scope.clear_breadcrumbs()
+            scope.add_event_processor(_make_event_processor(task, *args, **kwargs))
+
+            transaction = None
+
+            # Celery task objects are not a thing to be trusted. Even
+            # something such as attribute access can fail.
+            with capture_internal_exceptions():
+                headers = args[3].get("headers") or {}
+                transaction = continue_trace(
+                    headers,
+                    op=OP.QUEUE_TASK_CELERY,
+                    name="unknown celery task",
+                    source=TransactionSource.TASK,
+                    origin=CeleryIntegration.origin,
+                )
+                transaction.name = task.name
+                transaction.set_status(SPANSTATUS.OK)
+
+            if transaction is None:
+                return f(*args, **kwargs)
+
+            with sentry_sdk.start_transaction(
+                transaction,
+                custom_sampling_context={
+                    "celery_job": {
+                        "task": task.name,
+                        # for some reason, args[1] is a list if non-empty but a
+                        # tuple if empty
+                        "args": list(args[1]),
+                        "kwargs": args[2],
+                    }
+                },
+            ):
+                return f(*args, **kwargs)
+
+    return _inner  # type: ignore
+
+
+def _set_messaging_destination_name(task, span):
+    # type: (Any, Span) -> None
+    """Set "messaging.destination.name" tag for span"""
+    with capture_internal_exceptions():
+        delivery_info = task.request.delivery_info
+        if delivery_info:
+            routing_key = delivery_info.get("routing_key")
+            if delivery_info.get("exchange") == "" and routing_key is not None:
+                # Empty exchange indicates the default exchange, meaning the tasks
+                # are sent to the queue with the same name as the routing key.
+                span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
+
+
+def _wrap_task_call(task, f):
+    # type: (Any, F) -> F
+
+    # Need to wrap task call because the exception is caught before we get to
+    # see it. Also celery's reported stacktrace is untrustworthy.
+
+    # functools.wraps is important here because celery-once looks at this
+    # method's name. @ensure_integration_enabled internally calls functools.wraps,
+    # but if we ever remove the @ensure_integration_enabled decorator, we need
+    # to add @functools.wraps(f) here.
+    # https://github.com/getsentry/sentry-python/issues/421
+    @ensure_integration_enabled(CeleryIntegration, f)
+    def _inner(*args, **kwargs):
+        # type: (*Any, **Any) -> Any
+        try:
+            with sentry_sdk.start_span(
+                op=OP.QUEUE_PROCESS,
+                name=task.name,
+                origin=CeleryIntegration.origin,
+            ) as span:
+                _set_messaging_destination_name(task, span)
+
+                latency = None
+                with capture_internal_exceptions():
+                    if (
+                        task.request.headers is not None
+                        and "sentry-task-enqueued-time" in task.request.headers
+                    ):
+                        latency = _now_seconds_since_epoch() - task.request.headers.pop(
+                            "sentry-task-enqueued-time"
+                        )
+
+                if latency is not None:
+                    span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
+
+                with capture_internal_exceptions():
+                    span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
+
+                with capture_internal_exceptions():
+                    span.set_data(
+                        SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
+                    )
+
+                with capture_internal_exceptions():
+                    span.set_data(
+                        SPANDATA.MESSAGING_SYSTEM,
+                        task.app.connection().transport.driver_type,
+                    )
+
+                return f(*args, **kwargs)
+        except Exception:
+            exc_info = sys.exc_info()
+            with capture_internal_exceptions():
+                _capture_exception(task, exc_info)
+            reraise(*exc_info)
+
+    return _inner  # type: ignore
+
+
+def _patch_build_tracer():
+    # type: () -> None
+    import celery.app.trace as trace  # type: ignore
+
+    original_build_tracer = trace.build_tracer
+
+    def sentry_build_tracer(name, task, *args, **kwargs):
+        # type: (Any, Any, *Any, **Any) -> Any
+        if not getattr(task, "_sentry_is_patched", False):
+            # determine whether Celery will use __call__ or run and patch
+            # accordingly
+            if task_has_custom(task, "__call__"):
+                type(task).__call__ = _wrap_task_call(task, type(task).__call__)
+            else:
+                task.run = _wrap_task_call(task, task.run)
+
+            # `build_tracer` is apparently called for every task
+            # invocation. Can't wrap every celery task for every invocation
+            # or we will get infinitely nested wrapper functions.
+            task._sentry_is_patched = True
+
+        return _wrap_tracer(task, original_build_tracer(name, task, *args, **kwargs))
+
+    trace.build_tracer = sentry_build_tracer
+
+
+def _patch_task_apply_async():
+    # type: () -> None
+    Task.apply_async = _wrap_task_run(Task.apply_async)
+
+
+def _patch_celery_send_task():
+    # type: () -> None
+    from celery import Celery
+
+    Celery.send_task = _wrap_task_run(Celery.send_task)
+
+
+def _patch_worker_exit():
+    # type: () -> None
+
+    # Need to flush queue before worker shutdown because a crashing worker will
+    # call os._exit
+    from billiard.pool import Worker  # type: ignore
+
+    original_workloop = Worker.workloop
+
+    def sentry_workloop(*args, **kwargs):
+        # type: (*Any, **Any) -> Any
+        try:
+            return original_workloop(*args, **kwargs)
+        finally:
+            with capture_internal_exceptions():
+                if (
+                    sentry_sdk.get_client().get_integration(CeleryIntegration)
+                    is not None
+                ):
+                    sentry_sdk.flush()
+
+    Worker.workloop = sentry_workloop
+
+
+def _patch_producer_publish():
+    # type: () -> None
+    original_publish = Producer.publish
+
+    @ensure_integration_enabled(CeleryIntegration, original_publish)
+    def sentry_publish(self, *args, **kwargs):
+        # type: (Producer, *Any, **Any) -> Any
+        kwargs_headers = kwargs.get("headers", {})
+        if not isinstance(kwargs_headers, Mapping):
+            # Ensure kwargs_headers is a Mapping, so we can safely call get().
+            # We don't expect this to happen, but it's better to be safe. Even
+            # if it does happen, only our instrumentation breaks. This line
+            # does not overwrite kwargs["headers"], so the original publish
+            # method will still work.
+            kwargs_headers = {}
+
+        task_name = kwargs_headers.get("task")
+        task_id = kwargs_headers.get("id")
+        retries = kwargs_headers.get("retries")
+
+        routing_key = kwargs.get("routing_key")
+        exchange = kwargs.get("exchange")
+
+        with sentry_sdk.start_span(
+            op=OP.QUEUE_PUBLISH,
+            name=task_name,
+            origin=CeleryIntegration.origin,
+        ) as span:
+            if task_id is not None:
+                span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
+
+            if exchange == "" and routing_key is not None:
+                # Empty exchange indicates the default exchange, meaning messages are
+                # routed to the queue with the same name as the routing key.
+                span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
+
+            if retries is not None:
+                span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
+
+            with capture_internal_exceptions():
+                span.set_data(
+                    SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
+                )
+
+            return original_publish(self, *args, **kwargs)
+
+    Producer.publish = sentry_publish
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
new file mode 100644
index 00000000..ddbc8561
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
@@ -0,0 +1,293 @@
+import sentry_sdk
+from sentry_sdk.crons import capture_checkin, MonitorStatus
+from sentry_sdk.integrations import DidNotEnable
+from sentry_sdk.integrations.celery.utils import (
+    _get_humanized_interval,
+    _now_seconds_since_epoch,
+)
+from sentry_sdk.utils import (
+    logger,
+    match_regex_list,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from collections.abc import Callable
+    from typing import Any, Optional, TypeVar, Union
+    from sentry_sdk._types import (
+        MonitorConfig,
+        MonitorConfigScheduleType,
+        MonitorConfigScheduleUnit,
+    )
+
+    F = TypeVar("F", bound=Callable[..., Any])
+
+
+try:
+    from celery import Task, Celery  # type: ignore
+    from celery.beat import Scheduler  # type: ignore
+    from celery.schedules import crontab, schedule  # type: ignore
+    from celery.signals import (  # type: ignore
+        task_failure,
+        task_success,
+        task_retry,
+    )
+except ImportError:
+    raise DidNotEnable("Celery not installed")
+
+try:
+    from redbeat.schedulers import RedBeatScheduler  # type: ignore
+except ImportError:
+    RedBeatScheduler = None
+
+
+def _get_headers(task):
+    # type: (Task) -> dict[str, Any]
+    headers = task.request.get("headers") or {}
+
+    # flatten nested headers
+    if "headers" in headers:
+        headers.update(headers["headers"])
+        del headers["headers"]
+
+    headers.update(task.request.get("properties") or {})
+
+    return headers
+
+
+def _get_monitor_config(celery_schedule, app, monitor_name):
+    # type: (Any, Celery, str) -> MonitorConfig
+    monitor_config = {}  # type: MonitorConfig
+    schedule_type = None  # type: Optional[MonitorConfigScheduleType]
+    schedule_value = None  # type: Optional[Union[str, int]]
+    schedule_unit = None  # type: Optional[MonitorConfigScheduleUnit]
+
+    if isinstance(celery_schedule, crontab):
+        schedule_type = "crontab"
+        schedule_value = (
+            "{0._orig_minute} "
+            "{0._orig_hour} "
+            "{0._orig_day_of_month} "
+            "{0._orig_month_of_year} "
+            "{0._orig_day_of_week}".format(celery_schedule)
+        )
+    elif isinstance(celery_schedule, schedule):
+        schedule_type = "interval"
+        (schedule_value, schedule_unit) = _get_humanized_interval(
+            celery_schedule.seconds
+        )
+
+        if schedule_unit == "second":
+            logger.warning(
+                "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
+                monitor_name,
+                schedule_value,
+            )
+            return {}
+
+    else:
+        logger.warning(
+            "Celery schedule type '%s' not supported by Sentry Crons.",
+            type(celery_schedule),
+        )
+        return {}
+
+    monitor_config["schedule"] = {}
+    monitor_config["schedule"]["type"] = schedule_type
+    monitor_config["schedule"]["value"] = schedule_value
+
+    if schedule_unit is not None:
+        monitor_config["schedule"]["unit"] = schedule_unit
+
+    monitor_config["timezone"] = (
+        (
+            hasattr(celery_schedule, "tz")
+            and celery_schedule.tz is not None
+            and str(celery_schedule.tz)
+        )
+        or app.timezone
+        or "UTC"
+    )
+
+    return monitor_config
+
+
+def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
+    # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
+    """
+    Add Sentry Crons information to the schedule_entry headers.
+    """
+    if not integration.monitor_beat_tasks:
+        return
+
+    monitor_name = schedule_entry.name
+
+    task_should_be_excluded = match_regex_list(
+        monitor_name, integration.exclude_beat_tasks
+    )
+    if task_should_be_excluded:
+        return
+
+    celery_schedule = schedule_entry.schedule
+    app = scheduler.app
+
+    monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
+
+    is_supported_schedule = bool(monitor_config)
+    if not is_supported_schedule:
+        return
+
+    headers = schedule_entry.options.pop("headers", {})
+    headers.update(
+        {
+            "sentry-monitor-slug": monitor_name,
+            "sentry-monitor-config": monitor_config,
+        }
+    )
+
+    check_in_id = capture_checkin(
+        monitor_slug=monitor_name,
+        monitor_config=monitor_config,
+        status=MonitorStatus.IN_PROGRESS,
+    )
+    headers.update({"sentry-monitor-check-in-id": check_in_id})
+
+    # Set the Sentry configuration in the options of the ScheduleEntry.
+    # Those will be picked up in `apply_async` and added to the headers.
+    schedule_entry.options["headers"] = headers
+
+
+def _wrap_beat_scheduler(original_function):
+    # type: (Callable[..., Any]) -> Callable[..., Any]
+    """
+    Makes sure that:
+    - a new Sentry trace is started for each task started by Celery Beat and
+      it is propagated to the task.
+    - the Sentry Crons information is set in the Celery Beat task's
+      headers so that is is monitored with Sentry Crons.
+
+    After the patched function is called,
+    Celery Beat will call apply_async to put the task in the queue.
+    """
+    # Patch only once
+    # Can't use __name__ here, because some of our tests mock original_apply_entry
+    already_patched = "sentry_patched_scheduler" in str(original_function)
+    if already_patched:
+        return original_function
+
+    from sentry_sdk.integrations.celery import CeleryIntegration
+
+    def sentry_patched_scheduler(*args, **kwargs):
+        # type: (*Any, **Any) -> None
+        integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
+        if integration is None:
+            return original_function(*args, **kwargs)
+
+        # Tasks started by Celery Beat start a new Trace
+        scope = sentry_sdk.get_isolation_scope()
+        scope.set_new_propagation_context()
+        scope._name = "celery-beat"
+
+        scheduler, schedule_entry = args
+        _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
+
+        return original_function(*args, **kwargs)
+
+    return sentry_patched_scheduler
+
+
+def _patch_beat_apply_entry():
+    # type: () -> None
+    Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
+
+
+def _patch_redbeat_maybe_due():
+    # type: () -> None
+    if RedBeatScheduler is None:
+        return
+
+    RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)
+
+
+def _setup_celery_beat_signals(monitor_beat_tasks):
+    # type: (bool) -> None
+    if monitor_beat_tasks:
+        task_success.connect(crons_task_success)
+        task_failure.connect(crons_task_failure)
+        task_retry.connect(crons_task_retry)
+
+
+def crons_task_success(sender, **kwargs):
+    # type: (Task, dict[Any, Any]) -> None
+    logger.debug("celery_task_success %s", sender)
+    headers = _get_headers(sender)
+
+    if "sentry-monitor-slug" not in headers:
+        return
+
+    monitor_config = headers.get("sentry-monitor-config", {})
+
+    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
+
+    capture_checkin(
+        monitor_slug=headers["sentry-monitor-slug"],
+        monitor_config=monitor_config,
+        check_in_id=headers["sentry-monitor-check-in-id"],
+        duration=(
+            _now_seconds_since_epoch() - float(start_timestamp_s)
+            if start_timestamp_s
+            else None
+        ),
+        status=MonitorStatus.OK,
+    )
+
+
+def crons_task_failure(sender, **kwargs):
+    # type: (Task, dict[Any, Any]) -> None
+    logger.debug("celery_task_failure %s", sender)
+    headers = _get_headers(sender)
+
+    if "sentry-monitor-slug" not in headers:
+        return
+
+    monitor_config = headers.get("sentry-monitor-config", {})
+
+    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
+
+    capture_checkin(
+        monitor_slug=headers["sentry-monitor-slug"],
+        monitor_config=monitor_config,
+        check_in_id=headers["sentry-monitor-check-in-id"],
+        duration=(
+            _now_seconds_since_epoch() - float(start_timestamp_s)
+            if start_timestamp_s
+            else None
+        ),
+        status=MonitorStatus.ERROR,
+    )
+
+
+def crons_task_retry(sender, **kwargs):
+    # type: (Task, dict[Any, Any]) -> None
+    logger.debug("celery_task_retry %s", sender)
+    headers = _get_headers(sender)
+
+    if "sentry-monitor-slug" not in headers:
+        return
+
+    monitor_config = headers.get("sentry-monitor-config", {})
+
+    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
+
+    capture_checkin(
+        monitor_slug=headers["sentry-monitor-slug"],
+        monitor_config=monitor_config,
+        check_in_id=headers["sentry-monitor-check-in-id"],
+        duration=(
+            _now_seconds_since_epoch() - float(start_timestamp_s)
+            if start_timestamp_s
+            else None
+        ),
+        status=MonitorStatus.ERROR,
+    )
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py
new file mode 100644
index 00000000..a1961b15
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/utils.py
@@ -0,0 +1,43 @@
+import time
+from typing import TYPE_CHECKING, cast
+
+if TYPE_CHECKING:
+    from typing import Any, Tuple
+    from sentry_sdk._types import MonitorConfigScheduleUnit
+
+
+def _now_seconds_since_epoch():
+    # type: () -> float
+    # We cannot use `time.perf_counter()` when dealing with the duration
+    # of a Celery task, because the start of a Celery task and
+    # the end are recorded in different processes.
+    # Start happens in the Celery Beat process,
+    # the end in a Celery Worker process.
+    return time.time()
+
+
+def _get_humanized_interval(seconds):
+    # type: (float) -> Tuple[int, MonitorConfigScheduleUnit]
+    TIME_UNITS = (  # noqa: N806
+        ("day", 60 * 60 * 24.0),
+        ("hour", 60 * 60.0),
+        ("minute", 60.0),
+    )
+
+    seconds = float(seconds)
+    for unit, divider in TIME_UNITS:
+        if seconds >= divider:
+            interval = int(seconds / divider)
+            return (interval, cast("MonitorConfigScheduleUnit", unit))
+
+    return (int(seconds), "second")
+
+
+class NoOpMgr:
+    def __enter__(self):
+        # type: () -> None
+        return None
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        # type: (Any, Any, Any) -> None
+        return None