about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py293
1 files changed, 293 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
new file mode 100644
index 00000000..ddbc8561
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
@@ -0,0 +1,293 @@
+import sentry_sdk
+from sentry_sdk.crons import capture_checkin, MonitorStatus
+from sentry_sdk.integrations import DidNotEnable
+from sentry_sdk.integrations.celery.utils import (
+    _get_humanized_interval,
+    _now_seconds_since_epoch,
+)
+from sentry_sdk.utils import (
+    logger,
+    match_regex_list,
+)
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from collections.abc import Callable
+    from typing import Any, Optional, TypeVar, Union
+    from sentry_sdk._types import (
+        MonitorConfig,
+        MonitorConfigScheduleType,
+        MonitorConfigScheduleUnit,
+    )
+
+    F = TypeVar("F", bound=Callable[..., Any])
+
+
+try:
+    from celery import Task, Celery  # type: ignore
+    from celery.beat import Scheduler  # type: ignore
+    from celery.schedules import crontab, schedule  # type: ignore
+    from celery.signals import (  # type: ignore
+        task_failure,
+        task_success,
+        task_retry,
+    )
+except ImportError:
+    raise DidNotEnable("Celery not installed")
+
+try:
+    from redbeat.schedulers import RedBeatScheduler  # type: ignore
+except ImportError:
+    RedBeatScheduler = None
+
+
+def _get_headers(task):
+    # type: (Task) -> dict[str, Any]
+    headers = task.request.get("headers") or {}
+
+    # flatten nested headers
+    if "headers" in headers:
+        headers.update(headers["headers"])
+        del headers["headers"]
+
+    headers.update(task.request.get("properties") or {})
+
+    return headers
+
+
+def _get_monitor_config(celery_schedule, app, monitor_name):
+    # type: (Any, Celery, str) -> MonitorConfig
+    monitor_config = {}  # type: MonitorConfig
+    schedule_type = None  # type: Optional[MonitorConfigScheduleType]
+    schedule_value = None  # type: Optional[Union[str, int]]
+    schedule_unit = None  # type: Optional[MonitorConfigScheduleUnit]
+
+    if isinstance(celery_schedule, crontab):
+        schedule_type = "crontab"
+        schedule_value = (
+            "{0._orig_minute} "
+            "{0._orig_hour} "
+            "{0._orig_day_of_month} "
+            "{0._orig_month_of_year} "
+            "{0._orig_day_of_week}".format(celery_schedule)
+        )
+    elif isinstance(celery_schedule, schedule):
+        schedule_type = "interval"
+        (schedule_value, schedule_unit) = _get_humanized_interval(
+            celery_schedule.seconds
+        )
+
+        if schedule_unit == "second":
+            logger.warning(
+                "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
+                monitor_name,
+                schedule_value,
+            )
+            return {}
+
+    else:
+        logger.warning(
+            "Celery schedule type '%s' not supported by Sentry Crons.",
+            type(celery_schedule),
+        )
+        return {}
+
+    monitor_config["schedule"] = {}
+    monitor_config["schedule"]["type"] = schedule_type
+    monitor_config["schedule"]["value"] = schedule_value
+
+    if schedule_unit is not None:
+        monitor_config["schedule"]["unit"] = schedule_unit
+
+    monitor_config["timezone"] = (
+        (
+            hasattr(celery_schedule, "tz")
+            and celery_schedule.tz is not None
+            and str(celery_schedule.tz)
+        )
+        or app.timezone
+        or "UTC"
+    )
+
+    return monitor_config
+
+
+def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
+    # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
+    """
+    Add Sentry Crons information to the schedule_entry headers.
+    """
+    if not integration.monitor_beat_tasks:
+        return
+
+    monitor_name = schedule_entry.name
+
+    task_should_be_excluded = match_regex_list(
+        monitor_name, integration.exclude_beat_tasks
+    )
+    if task_should_be_excluded:
+        return
+
+    celery_schedule = schedule_entry.schedule
+    app = scheduler.app
+
+    monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
+
+    is_supported_schedule = bool(monitor_config)
+    if not is_supported_schedule:
+        return
+
+    headers = schedule_entry.options.pop("headers", {})
+    headers.update(
+        {
+            "sentry-monitor-slug": monitor_name,
+            "sentry-monitor-config": monitor_config,
+        }
+    )
+
+    check_in_id = capture_checkin(
+        monitor_slug=monitor_name,
+        monitor_config=monitor_config,
+        status=MonitorStatus.IN_PROGRESS,
+    )
+    headers.update({"sentry-monitor-check-in-id": check_in_id})
+
+    # Set the Sentry configuration in the options of the ScheduleEntry.
+    # Those will be picked up in `apply_async` and added to the headers.
+    schedule_entry.options["headers"] = headers
+
+
+def _wrap_beat_scheduler(original_function):
+    # type: (Callable[..., Any]) -> Callable[..., Any]
+    """
+    Makes sure that:
+    - a new Sentry trace is started for each task started by Celery Beat and
+      it is propagated to the task.
+    - the Sentry Crons information is set in the Celery Beat task's
+      headers so that is is monitored with Sentry Crons.
+
+    After the patched function is called,
+    Celery Beat will call apply_async to put the task in the queue.
+    """
+    # Patch only once
+    # Can't use __name__ here, because some of our tests mock original_apply_entry
+    already_patched = "sentry_patched_scheduler" in str(original_function)
+    if already_patched:
+        return original_function
+
+    from sentry_sdk.integrations.celery import CeleryIntegration
+
+    def sentry_patched_scheduler(*args, **kwargs):
+        # type: (*Any, **Any) -> None
+        integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
+        if integration is None:
+            return original_function(*args, **kwargs)
+
+        # Tasks started by Celery Beat start a new Trace
+        scope = sentry_sdk.get_isolation_scope()
+        scope.set_new_propagation_context()
+        scope._name = "celery-beat"
+
+        scheduler, schedule_entry = args
+        _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
+
+        return original_function(*args, **kwargs)
+
+    return sentry_patched_scheduler
+
+
+def _patch_beat_apply_entry():
+    # type: () -> None
+    Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
+
+
+def _patch_redbeat_maybe_due():
+    # type: () -> None
+    if RedBeatScheduler is None:
+        return
+
+    RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)
+
+
+def _setup_celery_beat_signals(monitor_beat_tasks):
+    # type: (bool) -> None
+    if monitor_beat_tasks:
+        task_success.connect(crons_task_success)
+        task_failure.connect(crons_task_failure)
+        task_retry.connect(crons_task_retry)
+
+
+def crons_task_success(sender, **kwargs):
+    # type: (Task, dict[Any, Any]) -> None
+    logger.debug("celery_task_success %s", sender)
+    headers = _get_headers(sender)
+
+    if "sentry-monitor-slug" not in headers:
+        return
+
+    monitor_config = headers.get("sentry-monitor-config", {})
+
+    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
+
+    capture_checkin(
+        monitor_slug=headers["sentry-monitor-slug"],
+        monitor_config=monitor_config,
+        check_in_id=headers["sentry-monitor-check-in-id"],
+        duration=(
+            _now_seconds_since_epoch() - float(start_timestamp_s)
+            if start_timestamp_s
+            else None
+        ),
+        status=MonitorStatus.OK,
+    )
+
+
+def crons_task_failure(sender, **kwargs):
+    # type: (Task, dict[Any, Any]) -> None
+    logger.debug("celery_task_failure %s", sender)
+    headers = _get_headers(sender)
+
+    if "sentry-monitor-slug" not in headers:
+        return
+
+    monitor_config = headers.get("sentry-monitor-config", {})
+
+    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
+
+    capture_checkin(
+        monitor_slug=headers["sentry-monitor-slug"],
+        monitor_config=monitor_config,
+        check_in_id=headers["sentry-monitor-check-in-id"],
+        duration=(
+            _now_seconds_since_epoch() - float(start_timestamp_s)
+            if start_timestamp_s
+            else None
+        ),
+        status=MonitorStatus.ERROR,
+    )
+
+
+def crons_task_retry(sender, **kwargs):
+    # type: (Task, dict[Any, Any]) -> None
+    logger.debug("celery_task_retry %s", sender)
+    headers = _get_headers(sender)
+
+    if "sentry-monitor-slug" not in headers:
+        return
+
+    monitor_config = headers.get("sentry-monitor-config", {})
+
+    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
+
+    capture_checkin(
+        monitor_slug=headers["sentry-monitor-slug"],
+        monitor_config=monitor_config,
+        check_in_id=headers["sentry-monitor-check-in-id"],
+        duration=(
+            _now_seconds_since_epoch() - float(start_timestamp_s)
+            if start_timestamp_s
+            else None
+        ),
+        status=MonitorStatus.ERROR,
+    )