diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py new file mode 100644 index 00000000..ddbc8561 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py @@ -0,0 +1,293 @@ +import sentry_sdk +from sentry_sdk.crons import capture_checkin, MonitorStatus +from sentry_sdk.integrations import DidNotEnable +from sentry_sdk.integrations.celery.utils import ( + _get_humanized_interval, + _now_seconds_since_epoch, +) +from sentry_sdk.utils import ( + logger, + match_regex_list, +) + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import Any, Optional, TypeVar, Union + from sentry_sdk._types import ( + MonitorConfig, + MonitorConfigScheduleType, + MonitorConfigScheduleUnit, + ) + + F = TypeVar("F", bound=Callable[..., Any]) + + +try: + from celery import Task, Celery # type: ignore + from celery.beat import Scheduler # type: ignore + from celery.schedules import crontab, schedule # type: ignore + from celery.signals import ( # type: ignore + task_failure, + task_success, + task_retry, + ) +except ImportError: + raise DidNotEnable("Celery not installed") + +try: + from redbeat.schedulers import RedBeatScheduler # type: ignore +except ImportError: + RedBeatScheduler = None + + +def _get_headers(task): + # type: (Task) -> dict[str, Any] + headers = task.request.get("headers") or {} + + # flatten nested headers + if "headers" in headers: + headers.update(headers["headers"]) + del headers["headers"] + + headers.update(task.request.get("properties") or {}) + + return headers + + +def _get_monitor_config(celery_schedule, app, monitor_name): + # type: (Any, Celery, str) -> MonitorConfig + monitor_config = {} # type: MonitorConfig + schedule_type = None # type: Optional[MonitorConfigScheduleType] + schedule_value = None # type: Optional[Union[str, int]] + schedule_unit = None # type: Optional[MonitorConfigScheduleUnit] + + if isinstance(celery_schedule, crontab): + schedule_type = "crontab" + schedule_value = ( + "{0._orig_minute} " + "{0._orig_hour} " + "{0._orig_day_of_month} " + "{0._orig_month_of_year} " + "{0._orig_day_of_week}".format(celery_schedule) + ) + elif isinstance(celery_schedule, schedule): + schedule_type = "interval" + (schedule_value, schedule_unit) = _get_humanized_interval( + celery_schedule.seconds + ) + + if schedule_unit == "second": + logger.warning( + "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.", + monitor_name, + schedule_value, + ) + return {} + + else: + logger.warning( + "Celery schedule type '%s' not supported by Sentry Crons.", + type(celery_schedule), + ) + return {} + + monitor_config["schedule"] = {} + monitor_config["schedule"]["type"] = schedule_type + monitor_config["schedule"]["value"] = schedule_value + + if schedule_unit is not None: + monitor_config["schedule"]["unit"] = schedule_unit + + monitor_config["timezone"] = ( + ( + hasattr(celery_schedule, "tz") + and celery_schedule.tz is not None + and str(celery_schedule.tz) + ) + or app.timezone + or "UTC" + ) + + return monitor_config + + +def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration): + # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None + """ + Add Sentry Crons information to the schedule_entry headers. + """ + if not integration.monitor_beat_tasks: + return + + monitor_name = schedule_entry.name + + task_should_be_excluded = match_regex_list( + monitor_name, integration.exclude_beat_tasks + ) + if task_should_be_excluded: + return + + celery_schedule = schedule_entry.schedule + app = scheduler.app + + monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) + + is_supported_schedule = bool(monitor_config) + if not is_supported_schedule: + return + + headers = schedule_entry.options.pop("headers", {}) + headers.update( + { + "sentry-monitor-slug": monitor_name, + "sentry-monitor-config": monitor_config, + } + ) + + check_in_id = capture_checkin( + monitor_slug=monitor_name, + monitor_config=monitor_config, + status=MonitorStatus.IN_PROGRESS, + ) + headers.update({"sentry-monitor-check-in-id": check_in_id}) + + # Set the Sentry configuration in the options of the ScheduleEntry. + # Those will be picked up in `apply_async` and added to the headers. + schedule_entry.options["headers"] = headers + + +def _wrap_beat_scheduler(original_function): + # type: (Callable[..., Any]) -> Callable[..., Any] + """ + Makes sure that: + - a new Sentry trace is started for each task started by Celery Beat and + it is propagated to the task. + - the Sentry Crons information is set in the Celery Beat task's + headers so that is is monitored with Sentry Crons. + + After the patched function is called, + Celery Beat will call apply_async to put the task in the queue. + """ + # Patch only once + # Can't use __name__ here, because some of our tests mock original_apply_entry + already_patched = "sentry_patched_scheduler" in str(original_function) + if already_patched: + return original_function + + from sentry_sdk.integrations.celery import CeleryIntegration + + def sentry_patched_scheduler(*args, **kwargs): + # type: (*Any, **Any) -> None + integration = sentry_sdk.get_client().get_integration(CeleryIntegration) + if integration is None: + return original_function(*args, **kwargs) + + # Tasks started by Celery Beat start a new Trace + scope = sentry_sdk.get_isolation_scope() + scope.set_new_propagation_context() + scope._name = "celery-beat" + + scheduler, schedule_entry = args + _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration) + + return original_function(*args, **kwargs) + + return sentry_patched_scheduler + + +def _patch_beat_apply_entry(): + # type: () -> None + Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry) + + +def _patch_redbeat_maybe_due(): + # type: () -> None + if RedBeatScheduler is None: + return + + RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due) + + +def _setup_celery_beat_signals(monitor_beat_tasks): + # type: (bool) -> None + if monitor_beat_tasks: + task_success.connect(crons_task_success) + task_failure.connect(crons_task_failure) + task_retry.connect(crons_task_retry) + + +def crons_task_success(sender, **kwargs): + # type: (Task, dict[Any, Any]) -> None + logger.debug("celery_task_success %s", sender) + headers = _get_headers(sender) + + if "sentry-monitor-slug" not in headers: + return + + monitor_config = headers.get("sentry-monitor-config", {}) + + start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") + + capture_checkin( + monitor_slug=headers["sentry-monitor-slug"], + monitor_config=monitor_config, + check_in_id=headers["sentry-monitor-check-in-id"], + duration=( + _now_seconds_since_epoch() - float(start_timestamp_s) + if start_timestamp_s + else None + ), + status=MonitorStatus.OK, + ) + + +def crons_task_failure(sender, **kwargs): + # type: (Task, dict[Any, Any]) -> None + logger.debug("celery_task_failure %s", sender) + headers = _get_headers(sender) + + if "sentry-monitor-slug" not in headers: + return + + monitor_config = headers.get("sentry-monitor-config", {}) + + start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") + + capture_checkin( + monitor_slug=headers["sentry-monitor-slug"], + monitor_config=monitor_config, + check_in_id=headers["sentry-monitor-check-in-id"], + duration=( + _now_seconds_since_epoch() - float(start_timestamp_s) + if start_timestamp_s + else None + ), + status=MonitorStatus.ERROR, + ) + + +def crons_task_retry(sender, **kwargs): + # type: (Task, dict[Any, Any]) -> None + logger.debug("celery_task_retry %s", sender) + headers = _get_headers(sender) + + if "sentry-monitor-slug" not in headers: + return + + monitor_config = headers.get("sentry-monitor-config", {}) + + start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") + + capture_checkin( + monitor_slug=headers["sentry-monitor-slug"], + monitor_config=monitor_config, + check_in_id=headers["sentry-monitor-check-in-id"], + duration=( + _now_seconds_since_epoch() - float(start_timestamp_s) + if start_timestamp_s + else None + ), + status=MonitorStatus.ERROR, + ) |