aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/apscheduler/executors/base.py
blob: d690a38d28636ad7d1b50f4d91d52098b2c279a3 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import logging
import sys
import traceback
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from traceback import format_tb

from apscheduler.events import (
    EVENT_JOB_ERROR,
    EVENT_JOB_EXECUTED,
    EVENT_JOB_MISSED,
    JobExecutionEvent,
)


class MaxInstancesReachedError(Exception):
    def __init__(self, job):
        super().__init__(
            'Job "%s" has already reached its maximum number of instances (%d)'
            % (job.id, job.max_instances)
        )


class BaseExecutor(metaclass=ABCMeta):
    """Abstract base class that defines the interface that every executor must implement."""

    _scheduler = None
    _lock = None
    _logger = logging.getLogger("apscheduler.executors")

    def __init__(self):
        super().__init__()
        self._instances = defaultdict(lambda: 0)

    def start(self, scheduler, alias):
        """
        Called by the scheduler when the scheduler is being started or when the executor is being
        added to an already running scheduler.

        :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting
            this executor
        :param str|unicode alias: alias of this executor as it was assigned to the scheduler

        """
        self._scheduler = scheduler
        self._lock = scheduler._create_lock()
        self._logger = logging.getLogger(f"apscheduler.executors.{alias}")

    def shutdown(self, wait=True):
        """
        Shuts down this executor.

        :param bool wait: ``True`` to wait until all submitted jobs
            have been executed
        """

    def submit_job(self, job, run_times):
        """
        Submits job for execution.

        :param Job job: job to execute
        :param list[datetime] run_times: list of datetimes specifying
            when the job should have been run
        :raises MaxInstancesReachedError: if the maximum number of
            allowed instances for this job has been reached

        """
        assert self._lock is not None, "This executor has not been started yet"
        with self._lock:
            if self._instances[job.id] >= job.max_instances:
                raise MaxInstancesReachedError(job)

            self._do_submit_job(job, run_times)
            self._instances[job.id] += 1

    @abstractmethod
    def _do_submit_job(self, job, run_times):
        """Performs the actual task of scheduling `run_job` to be called."""

    def _run_job_success(self, job_id, events):
        """
        Called by the executor with the list of generated events when :func:`run_job` has been
        successfully called.

        """
        with self._lock:
            self._instances[job_id] -= 1
            if self._instances[job_id] == 0:
                del self._instances[job_id]

        for event in events:
            self._scheduler._dispatch_event(event)

    def _run_job_error(self, job_id, exc, traceback=None):
        """Called by the executor with the exception if there is an error  calling `run_job`."""
        with self._lock:
            self._instances[job_id] -= 1
            if self._instances[job_id] == 0:
                del self._instances[job_id]

        exc_info = (exc.__class__, exc, traceback)
        self._logger.error("Error running job %s", job_id, exc_info=exc_info)


def run_job(job, jobstore_alias, run_times, logger_name):
    """
    Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
    scheduler.

    """
    events = []
    logger = logging.getLogger(logger_name)
    for run_time in run_times:
        # See if the job missed its run time window, and handle
        # possible misfires accordingly
        if job.misfire_grace_time is not None:
            difference = datetime.now(timezone.utc) - run_time
            grace_time = timedelta(seconds=job.misfire_grace_time)
            if difference > grace_time:
                events.append(
                    JobExecutionEvent(
                        EVENT_JOB_MISSED, job.id, jobstore_alias, run_time
                    )
                )
                logger.warning('Run time of job "%s" was missed by %s', job, difference)
                continue

        logger.info('Running job "%s" (scheduled at %s)', job, run_time)
        try:
            retval = job.func(*job.args, **job.kwargs)
        except BaseException:
            exc, tb = sys.exc_info()[1:]
            formatted_tb = "".join(format_tb(tb))
            events.append(
                JobExecutionEvent(
                    EVENT_JOB_ERROR,
                    job.id,
                    jobstore_alias,
                    run_time,
                    exception=exc,
                    traceback=formatted_tb,
                )
            )
            logger.exception('Job "%s" raised an exception', job)

            # This is to prevent cyclic references that would lead to memory leaks
            traceback.clear_frames(tb)
            del tb
        else:
            events.append(
                JobExecutionEvent(
                    EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval
                )
            )
            logger.info('Job "%s" executed successfully', job)

    return events


async def run_coroutine_job(job, jobstore_alias, run_times, logger_name):
    """Coroutine version of run_job()."""
    events = []
    logger = logging.getLogger(logger_name)
    for run_time in run_times:
        # See if the job missed its run time window, and handle possible misfires accordingly
        if job.misfire_grace_time is not None:
            difference = datetime.now(timezone.utc) - run_time
            grace_time = timedelta(seconds=job.misfire_grace_time)
            if difference > grace_time:
                events.append(
                    JobExecutionEvent(
                        EVENT_JOB_MISSED, job.id, jobstore_alias, run_time
                    )
                )
                logger.warning('Run time of job "%s" was missed by %s', job, difference)
                continue

        logger.info('Running job "%s" (scheduled at %s)', job, run_time)
        try:
            retval = await job.func(*job.args, **job.kwargs)
        except BaseException:
            exc, tb = sys.exc_info()[1:]
            formatted_tb = "".join(format_tb(tb))
            events.append(
                JobExecutionEvent(
                    EVENT_JOB_ERROR,
                    job.id,
                    jobstore_alias,
                    run_time,
                    exception=exc,
                    traceback=formatted_tb,
                )
            )
            logger.exception('Job "%s" raised an exception', job)
            traceback.clear_frames(tb)
        else:
            events.append(
                JobExecutionEvent(
                    EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval
                )
            )
            logger.info('Job "%s" executed successfully', job)

    return events