aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/apscheduler/schedulers/base.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/schedulers/base.py')
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/schedulers/base.py1264
1 files changed, 1264 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/schedulers/base.py b/.venv/lib/python3.12/site-packages/apscheduler/schedulers/base.py
new file mode 100644
index 00000000..7d713c75
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/schedulers/base.py
@@ -0,0 +1,1264 @@
+import sys
+import warnings
+from abc import ABCMeta, abstractmethod
+from collections.abc import Mapping, MutableMapping
+from contextlib import ExitStack
+from datetime import datetime, timedelta
+from importlib.metadata import entry_points
+from logging import getLogger
+from threading import TIMEOUT_MAX, RLock
+
+from tzlocal import get_localzone
+
+from apscheduler.events import (
+ EVENT_ALL,
+ EVENT_ALL_JOBS_REMOVED,
+ EVENT_EXECUTOR_ADDED,
+ EVENT_EXECUTOR_REMOVED,
+ EVENT_JOB_ADDED,
+ EVENT_JOB_MAX_INSTANCES,
+ EVENT_JOB_MODIFIED,
+ EVENT_JOB_REMOVED,
+ EVENT_JOB_SUBMITTED,
+ EVENT_JOBSTORE_ADDED,
+ EVENT_JOBSTORE_REMOVED,
+ EVENT_SCHEDULER_PAUSED,
+ EVENT_SCHEDULER_RESUMED,
+ EVENT_SCHEDULER_SHUTDOWN,
+ EVENT_SCHEDULER_STARTED,
+ JobEvent,
+ JobSubmissionEvent,
+ SchedulerEvent,
+)
+from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError
+from apscheduler.executors.pool import ThreadPoolExecutor
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.jobstores.memory import MemoryJobStore
+from apscheduler.schedulers import (
+ SchedulerAlreadyRunningError,
+ SchedulerNotRunningError,
+)
+from apscheduler.triggers.base import BaseTrigger
+from apscheduler.util import (
+ asbool,
+ asint,
+ astimezone,
+ maybe_ref,
+ obj_to_ref,
+ ref_to_obj,
+ undefined,
+)
+
+#: constant indicating a scheduler's stopped state
+STATE_STOPPED = 0
+#: constant indicating a scheduler's running state (started and processing jobs)
+STATE_RUNNING = 1
+#: constant indicating a scheduler's paused state (started but not processing jobs)
+STATE_PAUSED = 2
+
+
+class BaseScheduler(metaclass=ABCMeta):
+ """
+ Abstract base class for all schedulers.
+
+ Takes the following keyword arguments:
+
+ :param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to
+ apscheduler.scheduler)
+ :param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone)
+ :param int|float jobstore_retry_interval: the minimum number of seconds to wait between
+ retries in the scheduler's main loop if the job store raises an exception when getting
+ the list of due jobs
+ :param dict job_defaults: default values for newly added jobs
+ :param dict jobstores: a dictionary of job store alias -> job store instance or configuration
+ dict
+ :param dict executors: a dictionary of executor alias -> executor instance or configuration
+ dict
+
+ :ivar int state: current running state of the scheduler (one of the following constants from
+ ``apscheduler.schedulers.base``: ``STATE_STOPPED``, ``STATE_RUNNING``, ``STATE_PAUSED``)
+
+ .. seealso:: :ref:`scheduler-config`
+ """
+
+ # The `group=...` API is only available in the backport, used in <=3.7, and in std>=3.10.
+ if (3, 8) <= sys.version_info < (3, 10):
+ _trigger_plugins = {
+ ep.name: ep for ep in entry_points()["apscheduler.triggers"]
+ }
+ _executor_plugins = {
+ ep.name: ep for ep in entry_points()["apscheduler.executors"]
+ }
+ _jobstore_plugins = {
+ ep.name: ep for ep in entry_points()["apscheduler.jobstores"]
+ }
+ else:
+ _trigger_plugins = {
+ ep.name: ep for ep in entry_points(group="apscheduler.triggers")
+ }
+ _executor_plugins = {
+ ep.name: ep for ep in entry_points(group="apscheduler.executors")
+ }
+ _jobstore_plugins = {
+ ep.name: ep for ep in entry_points(group="apscheduler.jobstores")
+ }
+
+ _trigger_classes = {}
+ _executor_classes = {}
+ _jobstore_classes = {}
+
+ #
+ # Public API
+ #
+
+ def __init__(self, gconfig={}, **options):
+ super().__init__()
+ self._executors = {}
+ self._executors_lock = self._create_lock()
+ self._jobstores = {}
+ self._jobstores_lock = self._create_lock()
+ self._listeners = []
+ self._listeners_lock = self._create_lock()
+ self._pending_jobs = []
+ self.state = STATE_STOPPED
+ self.configure(gconfig, **options)
+
+ def __getstate__(self):
+ raise TypeError(
+ "Schedulers cannot be serialized. Ensure that you are not passing a "
+ "scheduler instance as an argument to a job, or scheduling an instance "
+ "method where the instance contains a scheduler as an attribute."
+ )
+
+ def configure(self, gconfig={}, prefix="apscheduler.", **options):
+ """
+ Reconfigures the scheduler with the given options.
+
+ Can only be done when the scheduler isn't running.
+
+ :param dict gconfig: a "global" configuration dictionary whose values can be overridden by
+ keyword arguments to this method
+ :param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with
+ this string (pass an empty string or ``None`` to use all keys)
+ :raises SchedulerAlreadyRunningError: if the scheduler is already running
+
+ """
+ if self.state != STATE_STOPPED:
+ raise SchedulerAlreadyRunningError
+
+ # If a non-empty prefix was given, strip it from the keys in the
+ # global configuration dict
+ if prefix:
+ prefixlen = len(prefix)
+ gconfig = dict(
+ (key[prefixlen:], value)
+ for key, value in gconfig.items()
+ if key.startswith(prefix)
+ )
+
+ # Create a structure from the dotted options
+ # (e.g. "a.b.c = d" -> {'a': {'b': {'c': 'd'}}})
+ config = {}
+ for key, value in gconfig.items():
+ parts = key.split(".")
+ parent = config
+ key = parts.pop(0)
+ while parts:
+ parent = parent.setdefault(key, {})
+ key = parts.pop(0)
+ parent[key] = value
+
+ # Override any options with explicit keyword arguments
+ config.update(options)
+ self._configure(config)
+
+ def start(self, paused=False):
+ """
+ Start the configured executors and job stores and begin processing scheduled jobs.
+
+ :param bool paused: if ``True``, don't start job processing until :meth:`resume` is called
+ :raises SchedulerAlreadyRunningError: if the scheduler is already running
+ :raises RuntimeError: if running under uWSGI with threads disabled
+
+ """
+ if self.state != STATE_STOPPED:
+ raise SchedulerAlreadyRunningError
+
+ self._check_uwsgi()
+
+ with self._executors_lock:
+ # Create a default executor if nothing else is configured
+ if "default" not in self._executors:
+ self.add_executor(self._create_default_executor(), "default")
+
+ # Start all the executors
+ for alias, executor in self._executors.items():
+ executor.start(self, alias)
+
+ with self._jobstores_lock:
+ # Create a default job store if nothing else is configured
+ if "default" not in self._jobstores:
+ self.add_jobstore(self._create_default_jobstore(), "default")
+
+ # Start all the job stores
+ for alias, store in self._jobstores.items():
+ store.start(self, alias)
+
+ # Schedule all pending jobs
+ for job, jobstore_alias, replace_existing in self._pending_jobs:
+ self._real_add_job(job, jobstore_alias, replace_existing)
+ del self._pending_jobs[:]
+
+ self.state = STATE_PAUSED if paused else STATE_RUNNING
+ self._logger.info("Scheduler started")
+ self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_STARTED))
+
+ if not paused:
+ self.wakeup()
+
+ @abstractmethod
+ def shutdown(self, wait=True):
+ """
+ Shuts down the scheduler, along with its executors and job stores.
+
+ Does not interrupt any currently running jobs.
+
+ :param bool wait: ``True`` to wait until all currently executing jobs have finished
+ :raises SchedulerNotRunningError: if the scheduler has not been started yet
+
+ """
+ if self.state == STATE_STOPPED:
+ raise SchedulerNotRunningError
+
+ self.state = STATE_STOPPED
+
+ # Shut down all executors
+ with self._executors_lock, self._jobstores_lock:
+ for executor in self._executors.values():
+ executor.shutdown(wait)
+
+ # Shut down all job stores
+ for jobstore in self._jobstores.values():
+ jobstore.shutdown()
+
+ self._logger.info("Scheduler has been shut down")
+ self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
+
+ def pause(self):
+ """
+ Pause job processing in the scheduler.
+
+ This will prevent the scheduler from waking up to do job processing until :meth:`resume`
+ is called. It will not however stop any already running job processing.
+
+ """
+ if self.state == STATE_STOPPED:
+ raise SchedulerNotRunningError
+ elif self.state == STATE_RUNNING:
+ self.state = STATE_PAUSED
+ self._logger.info("Paused scheduler job processing")
+ self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_PAUSED))
+
+ def resume(self):
+ """Resume job processing in the scheduler."""
+ if self.state == STATE_STOPPED:
+ raise SchedulerNotRunningError
+ elif self.state == STATE_PAUSED:
+ self.state = STATE_RUNNING
+ self._logger.info("Resumed scheduler job processing")
+ self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_RESUMED))
+ self.wakeup()
+
+ @property
+ def running(self):
+ """
+ Return ``True`` if the scheduler has been started.
+
+ This is a shortcut for ``scheduler.state != STATE_STOPPED``.
+
+ """
+ return self.state != STATE_STOPPED
+
+ def add_executor(self, executor, alias="default", **executor_opts):
+ """
+ Adds an executor to this scheduler.
+
+ Any extra keyword arguments will be passed to the executor plugin's constructor, assuming
+ that the first argument is the name of an executor plugin.
+
+ :param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor
+ instance or the name of an executor plugin
+ :param str|unicode alias: alias for the scheduler
+ :raises ValueError: if there is already an executor by the given alias
+
+ """
+ with self._executors_lock:
+ if alias in self._executors:
+ raise ValueError(
+ f'This scheduler already has an executor by the alias of "{alias}"'
+ )
+
+ if isinstance(executor, BaseExecutor):
+ self._executors[alias] = executor
+ elif isinstance(executor, str):
+ self._executors[alias] = executor = self._create_plugin_instance(
+ "executor", executor, executor_opts
+ )
+ else:
+ raise TypeError(
+ f"Expected an executor instance or a string, got {executor.__class__.__name__} instead"
+ )
+
+ # Start the executor right away if the scheduler is running
+ if self.state != STATE_STOPPED:
+ executor.start(self, alias)
+
+ self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_ADDED, alias))
+
+ def remove_executor(self, alias, shutdown=True):
+ """
+ Removes the executor by the given alias from this scheduler.
+
+ :param str|unicode alias: alias of the executor
+ :param bool shutdown: ``True`` to shut down the executor after
+ removing it
+
+ """
+ with self._executors_lock:
+ executor = self._lookup_executor(alias)
+ del self._executors[alias]
+
+ if shutdown:
+ executor.shutdown()
+
+ self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_REMOVED, alias))
+
+ def add_jobstore(self, jobstore, alias="default", **jobstore_opts):
+ """
+ Adds a job store to this scheduler.
+
+ Any extra keyword arguments will be passed to the job store plugin's constructor, assuming
+ that the first argument is the name of a job store plugin.
+
+ :param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
+ :param str|unicode alias: alias for the job store
+ :raises ValueError: if there is already a job store by the given alias
+
+ """
+ with self._jobstores_lock:
+ if alias in self._jobstores:
+ raise ValueError(
+ f'This scheduler already has a job store by the alias of "{alias}"'
+ )
+
+ if isinstance(jobstore, BaseJobStore):
+ self._jobstores[alias] = jobstore
+ elif isinstance(jobstore, str):
+ self._jobstores[alias] = jobstore = self._create_plugin_instance(
+ "jobstore", jobstore, jobstore_opts
+ )
+ else:
+ raise TypeError(
+ f"Expected a job store instance or a string, got {jobstore.__class__.__name__} instead"
+ )
+
+ # Start the job store right away if the scheduler isn't stopped
+ if self.state != STATE_STOPPED:
+ jobstore.start(self, alias)
+
+ # Notify listeners that a new job store has been added
+ self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_ADDED, alias))
+
+ # Notify the scheduler so it can scan the new job store for jobs
+ if self.state != STATE_STOPPED:
+ self.wakeup()
+
+ def remove_jobstore(self, alias, shutdown=True):
+ """
+ Removes the job store by the given alias from this scheduler.
+
+ :param str|unicode alias: alias of the job store
+ :param bool shutdown: ``True`` to shut down the job store after removing it
+
+ """
+ with self._jobstores_lock:
+ jobstore = self._lookup_jobstore(alias)
+ del self._jobstores[alias]
+
+ if shutdown:
+ jobstore.shutdown()
+
+ self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_REMOVED, alias))
+
+ def add_listener(self, callback, mask=EVENT_ALL):
+ """
+ add_listener(callback, mask=EVENT_ALL)
+
+ Adds a listener for scheduler events.
+
+ When a matching event occurs, ``callback`` is executed with the event object as its
+ sole argument. If the ``mask`` parameter is not provided, the callback will receive events
+ of all types.
+
+ :param callback: any callable that takes one argument
+ :param int mask: bitmask that indicates which events should be
+ listened to
+
+ .. seealso:: :mod:`apscheduler.events`
+ .. seealso:: :ref:`scheduler-events`
+
+ """
+ with self._listeners_lock:
+ self._listeners.append((callback, mask))
+
+ def remove_listener(self, callback):
+ """Removes a previously added event listener."""
+
+ with self._listeners_lock:
+ for i, (cb, _) in enumerate(self._listeners):
+ if callback == cb:
+ del self._listeners[i]
+
+ def add_job(
+ self,
+ func,
+ trigger=None,
+ args=None,
+ kwargs=None,
+ id=None,
+ name=None,
+ misfire_grace_time=undefined,
+ coalesce=undefined,
+ max_instances=undefined,
+ next_run_time=undefined,
+ jobstore="default",
+ executor="default",
+ replace_existing=False,
+ **trigger_args,
+ ):
+ """
+ add_job(func, trigger=None, args=None, kwargs=None, id=None, \
+ name=None, misfire_grace_time=undefined, coalesce=undefined, \
+ max_instances=undefined, next_run_time=undefined, \
+ jobstore='default', executor='default', \
+ replace_existing=False, **trigger_args)
+
+ Adds the given job to the job list and wakes up the scheduler if it's already running.
+
+ Any option that defaults to ``undefined`` will be replaced with the corresponding default
+ value when the job is scheduled (which happens when the scheduler is started, or
+ immediately if the scheduler is already running).
+
+ The ``func`` argument can be given either as a callable object or a textual reference in
+ the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
+ importable module and the second half is a reference to the callable object, relative to
+ the module.
+
+ The ``trigger`` argument can either be:
+ #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
+ any extra keyword arguments to this method are passed on to the trigger's constructor
+ #. an instance of a trigger class
+
+ :param func: callable (or a textual reference to one) to run at the given time
+ :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
+ ``func`` is called
+ :param list|tuple args: list of positional arguments to call func with
+ :param dict kwargs: dict of keyword arguments to call func with
+ :param str|unicode id: explicit identifier for the job (for modifying it later)
+ :param str|unicode name: textual description of the job
+ :param int misfire_grace_time: seconds after the designated runtime that the job is still
+ allowed to be run (or ``None`` to allow the job to run no matter how late it is)
+ :param bool coalesce: run once instead of many times if the scheduler determines that the
+ job should be run more than once in succession
+ :param int max_instances: maximum number of concurrently running instances allowed for this
+ job
+ :param datetime next_run_time: when to first run the job, regardless of the trigger (pass
+ ``None`` to add the job as paused)
+ :param str|unicode jobstore: alias of the job store to store the job in
+ :param str|unicode executor: alias of the executor to run the job with
+ :param bool replace_existing: ``True`` to replace an existing job with the same ``id``
+ (but retain the number of runs from the existing one)
+ :rtype: Job
+
+ """
+ job_kwargs = {
+ "trigger": self._create_trigger(trigger, trigger_args),
+ "executor": executor,
+ "func": func,
+ "args": tuple(args) if args is not None else (),
+ "kwargs": dict(kwargs) if kwargs is not None else {},
+ "id": id,
+ "name": name,
+ "misfire_grace_time": misfire_grace_time,
+ "coalesce": coalesce,
+ "max_instances": max_instances,
+ "next_run_time": next_run_time,
+ }
+ job_kwargs = dict(
+ (key, value) for key, value in job_kwargs.items() if value is not undefined
+ )
+ job = Job(self, **job_kwargs)
+
+ # Don't really add jobs to job stores before the scheduler is up and running
+ with self._jobstores_lock:
+ if self.state == STATE_STOPPED:
+ self._pending_jobs.append((job, jobstore, replace_existing))
+ self._logger.info(
+ "Adding job tentatively -- it will be properly scheduled when "
+ "the scheduler starts"
+ )
+ else:
+ self._real_add_job(job, jobstore, replace_existing)
+
+ return job
+
+ def scheduled_job(
+ self,
+ trigger,
+ args=None,
+ kwargs=None,
+ id=None,
+ name=None,
+ misfire_grace_time=undefined,
+ coalesce=undefined,
+ max_instances=undefined,
+ next_run_time=undefined,
+ jobstore="default",
+ executor="default",
+ **trigger_args,
+ ):
+ """
+ scheduled_job(trigger, args=None, kwargs=None, id=None, \
+ name=None, misfire_grace_time=undefined, \
+ coalesce=undefined, max_instances=undefined, \
+ next_run_time=undefined, jobstore='default', \
+ executor='default',**trigger_args)
+
+ A decorator version of :meth:`add_job`, except that ``replace_existing`` is always
+ ``True``.
+
+ .. important:: The ``id`` argument must be given if scheduling a job in a persistent job
+ store. The scheduler cannot, however, enforce this requirement.
+
+ """
+
+ def inner(func):
+ self.add_job(
+ func,
+ trigger,
+ args,
+ kwargs,
+ id,
+ name,
+ misfire_grace_time,
+ coalesce,
+ max_instances,
+ next_run_time,
+ jobstore,
+ executor,
+ True,
+ **trigger_args,
+ )
+ return func
+
+ return inner
+
+ def modify_job(self, job_id, jobstore=None, **changes):
+ """
+ Modifies the properties of a single job.
+
+ Modifications are passed to this method as extra keyword arguments.
+
+ :param str|unicode job_id: the identifier of the job
+ :param str|unicode jobstore: alias of the job store that contains the job
+ :return Job: the relevant job instance
+
+ """
+ with self._jobstores_lock:
+ job, jobstore = self._lookup_job(job_id, jobstore)
+ job._modify(**changes)
+ if jobstore:
+ self._lookup_jobstore(jobstore).update_job(job)
+
+ self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore))
+
+ # Wake up the scheduler since the job's next run time may have been changed
+ if self.state == STATE_RUNNING:
+ self.wakeup()
+
+ return job
+
+ def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
+ """
+ Constructs a new trigger for a job and updates its next run time.
+
+ Extra keyword arguments are passed directly to the trigger's constructor.
+
+ :param str|unicode job_id: the identifier of the job
+ :param str|unicode jobstore: alias of the job store that contains the job
+ :param trigger: alias of the trigger type or a trigger instance
+ :return Job: the relevant job instance
+
+ """
+ trigger = self._create_trigger(trigger, trigger_args)
+ now = datetime.now(self.timezone)
+ next_run_time = trigger.get_next_fire_time(None, now)
+ return self.modify_job(
+ job_id, jobstore, trigger=trigger, next_run_time=next_run_time
+ )
+
+ def pause_job(self, job_id, jobstore=None):
+ """
+ Causes the given job not to be executed until it is explicitly resumed.
+
+ :param str|unicode job_id: the identifier of the job
+ :param str|unicode jobstore: alias of the job store that contains the job
+ :return Job: the relevant job instance
+
+ """
+ return self.modify_job(job_id, jobstore, next_run_time=None)
+
+ def resume_job(self, job_id, jobstore=None):
+ """
+ Resumes the schedule of the given job, or removes the job if its schedule is finished.
+
+ :param str|unicode job_id: the identifier of the job
+ :param str|unicode jobstore: alias of the job store that contains the job
+ :return Job|None: the relevant job instance if the job was rescheduled, or ``None`` if no
+ next run time could be calculated and the job was removed
+
+ """
+ with self._jobstores_lock:
+ job, jobstore = self._lookup_job(job_id, jobstore)
+ now = datetime.now(self.timezone)
+ next_run_time = job.trigger.get_next_fire_time(None, now)
+ if next_run_time:
+ return self.modify_job(job_id, jobstore, next_run_time=next_run_time)
+ else:
+ self.remove_job(job.id, jobstore)
+
+ def get_jobs(self, jobstore=None, pending=None):
+ """
+ Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled
+ jobs, either from a specific job store or from all of them.
+
+ If the scheduler has not been started yet, only pending jobs can be returned because the
+ job stores haven't been started yet either.
+
+ :param str|unicode jobstore: alias of the job store
+ :param bool pending: **DEPRECATED**
+ :rtype: list[Job]
+
+ """
+ if pending is not None:
+ warnings.warn(
+ 'The "pending" option is deprecated -- get_jobs() always returns '
+ "scheduled jobs if the scheduler has been started and pending jobs "
+ "otherwise",
+ DeprecationWarning,
+ )
+
+ with self._jobstores_lock:
+ jobs = []
+ if self.state == STATE_STOPPED:
+ for job, alias, replace_existing in self._pending_jobs:
+ if jobstore is None or alias == jobstore:
+ jobs.append(job)
+ else:
+ for alias, store in self._jobstores.items():
+ if jobstore is None or alias == jobstore:
+ jobs.extend(store.get_all_jobs())
+
+ return jobs
+
+ def get_job(self, job_id, jobstore=None):
+ """
+ Returns the Job that matches the given ``job_id``.
+
+ :param str|unicode job_id: the identifier of the job
+ :param str|unicode jobstore: alias of the job store that most likely contains the job
+ :return: the Job by the given ID, or ``None`` if it wasn't found
+ :rtype: Job
+
+ """
+ with self._jobstores_lock:
+ try:
+ return self._lookup_job(job_id, jobstore)[0]
+ except JobLookupError:
+ return
+
+ def remove_job(self, job_id, jobstore=None):
+ """
+ Removes a job, preventing it from being run any more.
+
+ :param str|unicode job_id: the identifier of the job
+ :param str|unicode jobstore: alias of the job store that contains the job
+ :raises JobLookupError: if the job was not found
+
+ """
+ jobstore_alias = None
+ with self._jobstores_lock:
+ # Check if the job is among the pending jobs
+ if self.state == STATE_STOPPED:
+ for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
+ if job.id == job_id and jobstore in (None, alias):
+ del self._pending_jobs[i]
+ jobstore_alias = alias
+ break
+ else:
+ # Otherwise, try to remove it from each store until it succeeds or we run out of
+ # stores to check
+ for alias, store in self._jobstores.items():
+ if jobstore in (None, alias):
+ try:
+ store.remove_job(job_id)
+ jobstore_alias = alias
+ break
+ except JobLookupError:
+ continue
+
+ if jobstore_alias is None:
+ raise JobLookupError(job_id)
+
+ # Notify listeners that a job has been removed
+ event = JobEvent(EVENT_JOB_REMOVED, job_id, jobstore_alias)
+ self._dispatch_event(event)
+
+ self._logger.info("Removed job %s", job_id)
+
+ def remove_all_jobs(self, jobstore=None):
+ """
+ Removes all jobs from the specified job store, or all job stores if none is given.
+
+ :param str|unicode jobstore: alias of the job store
+
+ """
+ with self._jobstores_lock:
+ if self.state == STATE_STOPPED:
+ if jobstore:
+ self._pending_jobs = [
+ pending
+ for pending in self._pending_jobs
+ if pending[1] != jobstore
+ ]
+ else:
+ self._pending_jobs = []
+ else:
+ for alias, store in self._jobstores.items():
+ if jobstore in (None, alias):
+ store.remove_all_jobs()
+
+ self._dispatch_event(SchedulerEvent(EVENT_ALL_JOBS_REMOVED, jobstore))
+
+ def print_jobs(self, jobstore=None, out=None):
+ """
+ print_jobs(jobstore=None, out=sys.stdout)
+
+ Prints out a textual listing of all jobs currently scheduled on either all job stores or
+ just a specific one.
+
+ :param str|unicode jobstore: alias of the job store, ``None`` to list jobs from all stores
+ :param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is
+ given)
+
+ """
+ out = out or sys.stdout
+ with self._jobstores_lock:
+ if self.state == STATE_STOPPED:
+ print("Pending jobs:", file=out)
+ if self._pending_jobs:
+ for job, jobstore_alias, replace_existing in self._pending_jobs:
+ if jobstore in (None, jobstore_alias):
+ print(f" {job}", file=out)
+ else:
+ print(" No pending jobs", file=out)
+ else:
+ for alias, store in sorted(self._jobstores.items()):
+ if jobstore in (None, alias):
+ print(f"Jobstore {alias}:", file=out)
+ jobs = store.get_all_jobs()
+ if jobs:
+ for job in jobs:
+ print(f" {job}", file=out)
+ else:
+ print(" No scheduled jobs", file=out)
+
+ def export_jobs(self, outfile, jobstore=None):
+ """
+ Export stored jobs as JSON.
+
+ :param outfile: either a file object opened in text write mode ("w"), or a path
+ to the target file
+ :param jobstore: alias of the job store to export jobs from (if omitted, export
+ from all configured job stores)
+
+ """
+ import json
+ import pickle
+ from base64 import b64encode
+
+ from apscheduler import version
+
+ if self.state == STATE_STOPPED:
+ raise RuntimeError(
+ "the scheduler must have been started for job export to work"
+ )
+
+ def encode_with_pickle(obj):
+ return b64encode(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)).decode("ascii")
+
+ def json_default(obj):
+ if hasattr(obj, "__getstate__") and hasattr(obj, "__setstate__"):
+ state = obj.__getstate__()
+ if isinstance(state, Mapping):
+ return {
+ "__apscheduler_class__": obj_to_ref(obj.__class__),
+ "__apscheduler_state__": state,
+ }
+
+ return {"__apscheduler_pickle__": encode_with_pickle(obj)}
+
+ with self._jobstores_lock:
+ all_jobs = [
+ job
+ for alias, store in self._jobstores.items()
+ for job in store.get_all_jobs()
+ if jobstore in (None, alias)
+ ]
+
+ with ExitStack() as stack:
+ if not hasattr(outfile, "write"):
+ outfile = stack.enter_context(open(outfile, "w"))
+
+ json.dump(
+ {
+ "version": 1,
+ "scheduler_version": version,
+ "jobs": [job.__getstate__() for job in all_jobs],
+ },
+ outfile,
+ default=json_default,
+ )
+
+ def import_jobs(self, infile, jobstore="default"):
+ """
+ Import jobs previously exported via :meth:`export_jobs.
+
+ :param infile: either a file object opened in text read mode ("r") or a path to
+ a JSON file containing previously exported jobs
+ :param jobstore: the alias of the job store to import the jobs to
+
+ """
+ import json
+ import pickle
+ from base64 import b64decode
+
+ def json_object_hook(dct):
+ if pickle_data := dct.get("__apscheduler_pickle__"):
+ return pickle.loads(b64decode(pickle_data))
+
+ if obj_class := dct.get("__apscheduler_class__"):
+ if obj_state := dct.get("__apscheduler_state__"):
+ obj_class = ref_to_obj(obj_class)
+ obj = object.__new__(obj_class)
+ obj.__setstate__(obj_state)
+ return obj
+
+ return dct
+
+ jobstore = self._jobstores[jobstore]
+ with ExitStack() as stack:
+ if not hasattr(infile, "read"):
+ infile = stack.enter_context(open(infile))
+
+ data = json.load(infile, object_hook=json_object_hook)
+ if not isinstance(data, dict):
+ raise ValueError()
+
+ if (version := data.get("version", None)) != 1:
+ raise ValueError(f"unrecognized version: {version}")
+
+ for job_state in data["jobs"]:
+ job = object.__new__(Job)
+ job.__setstate__(job_state)
+ jobstore.add_job(job)
+
+ @abstractmethod
+ def wakeup(self):
+ """
+ Notifies the scheduler that there may be jobs due for execution.
+ Triggers :meth:`_process_jobs` to be run in an implementation specific manner.
+ """
+
+ #
+ # Private API
+ #
+
+ def _configure(self, config):
+ # Set general options
+ self._logger = maybe_ref(config.pop("logger", None)) or getLogger(
+ "apscheduler.scheduler"
+ )
+ self.timezone = astimezone(config.pop("timezone", None)) or get_localzone()
+ self.jobstore_retry_interval = float(config.pop("jobstore_retry_interval", 10))
+
+ # Set the job defaults
+ job_defaults = config.get("job_defaults", {})
+ self._job_defaults = {
+ "misfire_grace_time": asint(job_defaults.get("misfire_grace_time", 1)),
+ "coalesce": asbool(job_defaults.get("coalesce", True)),
+ "max_instances": asint(job_defaults.get("max_instances", 1)),
+ }
+
+ # Configure executors
+ self._executors.clear()
+ for alias, value in config.get("executors", {}).items():
+ if isinstance(value, BaseExecutor):
+ self.add_executor(value, alias)
+ elif isinstance(value, MutableMapping):
+ executor_class = value.pop("class", None)
+ plugin = value.pop("type", None)
+ if plugin:
+ executor = self._create_plugin_instance("executor", plugin, value)
+ elif executor_class:
+ cls = maybe_ref(executor_class)
+ executor = cls(**value)
+ else:
+ raise ValueError(
+ f'Cannot create executor "{alias}" -- either "type" or "class" must be defined'
+ )
+
+ self.add_executor(executor, alias)
+ else:
+ raise TypeError(
+ f"Expected executor instance or dict for executors['{alias}'], got {value.__class__.__name__} instead"
+ )
+
+ # Configure job stores
+ self._jobstores.clear()
+ for alias, value in config.get("jobstores", {}).items():
+ if isinstance(value, BaseJobStore):
+ self.add_jobstore(value, alias)
+ elif isinstance(value, MutableMapping):
+ jobstore_class = value.pop("class", None)
+ plugin = value.pop("type", None)
+ if plugin:
+ jobstore = self._create_plugin_instance("jobstore", plugin, value)
+ elif jobstore_class:
+ cls = maybe_ref(jobstore_class)
+ jobstore = cls(**value)
+ else:
+ raise ValueError(
+ f'Cannot create job store "{alias}" -- either "type" or "class" must be '
+ "defined"
+ )
+
+ self.add_jobstore(jobstore, alias)
+ else:
+ raise TypeError(
+ f"Expected job store instance or dict for jobstores['{alias}'], got {value.__class__.__name__} instead"
+ )
+
+ def _create_default_executor(self):
+ """Creates a default executor store, specific to the particular scheduler type."""
+ return ThreadPoolExecutor()
+
+ def _create_default_jobstore(self):
+ """Creates a default job store, specific to the particular scheduler type."""
+ return MemoryJobStore()
+
+ def _lookup_executor(self, alias):
+ """
+ Returns the executor instance by the given name from the list of executors that were added
+ to this scheduler.
+
+ :type alias: str
+ :raises KeyError: if no executor by the given alias is not found
+
+ """
+ try:
+ return self._executors[alias]
+ except KeyError:
+ raise KeyError(f"No such executor: {alias}")
+
+ def _lookup_jobstore(self, alias):
+ """
+ Returns the job store instance by the given name from the list of job stores that were
+ added to this scheduler.
+
+ :type alias: str
+ :raises KeyError: if no job store by the given alias is not found
+
+ """
+ try:
+ return self._jobstores[alias]
+ except KeyError:
+ raise KeyError(f"No such job store: {alias}")
+
+ def _lookup_job(self, job_id, jobstore_alias):
+ """
+ Finds a job by its ID.
+
+ :type job_id: str
+ :param str jobstore_alias: alias of a job store to look in
+ :return tuple[Job, str]: a tuple of job, jobstore alias (jobstore alias is None in case of
+ a pending job)
+ :raises JobLookupError: if no job by the given ID is found.
+
+ """
+ if self.state == STATE_STOPPED:
+ # Check if the job is among the pending jobs
+ for job, alias, replace_existing in self._pending_jobs:
+ if job.id == job_id:
+ return job, None
+ else:
+ # Look in all job stores
+ for alias, store in self._jobstores.items():
+ if jobstore_alias in (None, alias):
+ job = store.lookup_job(job_id)
+ if job is not None:
+ return job, alias
+
+ raise JobLookupError(job_id)
+
+ def _dispatch_event(self, event):
+ """
+ Dispatches the given event to interested listeners.
+
+ :param SchedulerEvent event: the event to send
+
+ """
+ with self._listeners_lock:
+ listeners = tuple(self._listeners)
+
+ for cb, mask in listeners:
+ if event.code & mask:
+ try:
+ cb(event)
+ except BaseException:
+ self._logger.exception("Error notifying listener")
+
+ def _check_uwsgi(self):
+ """Check if we're running under uWSGI with threads disabled."""
+ uwsgi_module = sys.modules.get("uwsgi")
+ if not getattr(uwsgi_module, "has_threads", True):
+ raise RuntimeError(
+ "The scheduler seems to be running under uWSGI, but threads have "
+ "been disabled. You must run uWSGI with the --enable-threads "
+ "option for the scheduler to work."
+ )
+
+ def _real_add_job(self, job, jobstore_alias, replace_existing):
+ """
+ :param Job job: the job to add
+ :param bool replace_existing: ``True`` to use update_job() in case the job already exists
+ in the store
+
+ """
+ # Fill in undefined values with defaults
+ replacements = {}
+ for key, value in self._job_defaults.items():
+ if not hasattr(job, key):
+ replacements[key] = value
+
+ # Calculate the next run time if there is none defined
+ if not hasattr(job, "next_run_time"):
+ now = datetime.now(self.timezone)
+ replacements["next_run_time"] = job.trigger.get_next_fire_time(None, now)
+
+ # Apply any replacements
+ job._modify(**replacements)
+
+ # Add the job to the given job store
+ store = self._lookup_jobstore(jobstore_alias)
+ try:
+ store.add_job(job)
+ except ConflictingIdError:
+ if replace_existing:
+ store.update_job(job)
+ else:
+ raise
+
+ # Mark the job as no longer pending
+ job._jobstore_alias = jobstore_alias
+
+ # Notify listeners that a new job has been added
+ event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
+ self._dispatch_event(event)
+
+ self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)
+
+ # Notify the scheduler about the new job
+ if self.state == STATE_RUNNING:
+ self.wakeup()
+
+ def _create_plugin_instance(self, type_, alias, constructor_kwargs):
+ """Creates an instance of the given plugin type, loading the plugin first if necessary."""
+ plugin_container, class_container, base_class = {
+ "trigger": (self._trigger_plugins, self._trigger_classes, BaseTrigger),
+ "jobstore": (self._jobstore_plugins, self._jobstore_classes, BaseJobStore),
+ "executor": (self._executor_plugins, self._executor_classes, BaseExecutor),
+ }[type_]
+
+ try:
+ plugin_cls = class_container[alias]
+ except KeyError:
+ if alias in plugin_container:
+ plugin_cls = class_container[alias] = plugin_container[alias].load()
+ if not issubclass(plugin_cls, base_class):
+ raise TypeError(
+ f"The {type_} entry point does not point to a {type_} class"
+ )
+ else:
+ raise LookupError(f'No {type_} by the name "{alias}" was found')
+
+ return plugin_cls(**constructor_kwargs)
+
+ def _create_trigger(self, trigger, trigger_args):
+ if isinstance(trigger, BaseTrigger):
+ return trigger
+ elif trigger is None:
+ trigger = "date"
+ elif not isinstance(trigger, str):
+ raise TypeError(
+ f"Expected a trigger instance or string, got {trigger.__class__.__name__} instead"
+ )
+
+ # Use the scheduler's time zone if nothing else is specified
+ trigger_args.setdefault("timezone", self.timezone)
+
+ # Instantiate the trigger class
+ return self._create_plugin_instance("trigger", trigger, trigger_args)
+
+ def _create_lock(self):
+ """Creates a reentrant lock object."""
+ return RLock()
+
+ def _process_jobs(self):
+ """
+ Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
+ to wait for the next round.
+
+ If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
+ ``jobstore_retry_interval`` seconds.
+
+ """
+ if self.state == STATE_PAUSED:
+ self._logger.debug("Scheduler is paused -- not processing jobs")
+ return None
+
+ self._logger.debug("Looking for jobs to run")
+ now = datetime.now(self.timezone)
+ next_wakeup_time = None
+ events = []
+
+ with self._jobstores_lock:
+ for jobstore_alias, jobstore in self._jobstores.items():
+ try:
+ due_jobs = jobstore.get_due_jobs(now)
+ except Exception as e:
+ # Schedule a wakeup at least in jobstore_retry_interval seconds
+ self._logger.warning(
+ "Error getting due jobs from job store %r: %s",
+ jobstore_alias,
+ e,
+ )
+ retry_wakeup_time = now + timedelta(
+ seconds=self.jobstore_retry_interval
+ )
+ if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
+ next_wakeup_time = retry_wakeup_time
+
+ continue
+
+ for job in due_jobs:
+ # Look up the job's executor
+ try:
+ executor = self._lookup_executor(job.executor)
+ except BaseException:
+ self._logger.error(
+ 'Executor lookup ("%s") failed for job "%s" -- removing it from the '
+ "job store",
+ job.executor,
+ job,
+ )
+ self.remove_job(job.id, jobstore_alias)
+ continue
+
+ run_times = job._get_run_times(now)
+ run_times = (
+ run_times[-1:] if run_times and job.coalesce else run_times
+ )
+ if run_times:
+ try:
+ executor.submit_job(job, run_times)
+ except MaxInstancesReachedError:
+ self._logger.warning(
+ 'Execution of job "%s" skipped: maximum number of running '
+ "instances reached (%d)",
+ job,
+ job.max_instances,
+ )
+ event = JobSubmissionEvent(
+ EVENT_JOB_MAX_INSTANCES,
+ job.id,
+ jobstore_alias,
+ run_times,
+ )
+ events.append(event)
+ except BaseException:
+ self._logger.exception(
+ 'Error submitting job "%s" to executor "%s"',
+ job,
+ job.executor,
+ )
+ else:
+ event = JobSubmissionEvent(
+ EVENT_JOB_SUBMITTED, job.id, jobstore_alias, run_times
+ )
+ events.append(event)
+
+ # Update the job if it has a next execution time.
+ # Otherwise remove it from the job store.
+ job_next_run = job.trigger.get_next_fire_time(
+ run_times[-1], now
+ )
+ if job_next_run:
+ job._modify(next_run_time=job_next_run)
+ jobstore.update_job(job)
+ else:
+ self.remove_job(job.id, jobstore_alias)
+
+ # Set a new next wakeup time if there isn't one yet or
+ # the jobstore has an even earlier one
+ jobstore_next_run_time = jobstore.get_next_run_time()
+ if jobstore_next_run_time and (
+ next_wakeup_time is None
+ or jobstore_next_run_time < next_wakeup_time
+ ):
+ next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)
+
+ # Dispatch collected events
+ for event in events:
+ self._dispatch_event(event)
+
+ # Determine the delay until this method should be called again
+ if self.state == STATE_PAUSED:
+ wait_seconds = None
+ self._logger.debug("Scheduler is paused; waiting until resume() is called")
+ elif next_wakeup_time is None:
+ wait_seconds = None
+ self._logger.debug("No jobs; waiting until a job is added")
+ else:
+ now = datetime.now(self.timezone)
+ wait_seconds = min(
+ max((next_wakeup_time - now).total_seconds(), 0), TIMEOUT_MAX
+ )
+ self._logger.debug(
+ "Next wakeup is due at %s (in %f seconds)",
+ next_wakeup_time,
+ wait_seconds,
+ )
+
+ return wait_seconds