aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/apscheduler/jobstores
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/apscheduler/jobstores
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/jobstores')
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py141
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py170
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py106
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py158
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py160
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py173
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py194
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py197
9 files changed, 1299 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py
new file mode 100644
index 00000000..01cabd1e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py
@@ -0,0 +1,141 @@
+import logging
+from abc import ABCMeta, abstractmethod
+
+
+class JobLookupError(KeyError):
+ """Raised when the job store cannot find a job for update or removal."""
+
+ def __init__(self, job_id):
+ super().__init__(f"No job by the id of {job_id} was found")
+
+
+class ConflictingIdError(KeyError):
+ """Raised when the uniqueness of job IDs is being violated."""
+
+ def __init__(self, job_id):
+ super().__init__(f"Job identifier ({job_id}) conflicts with an existing job")
+
+
+class TransientJobError(ValueError):
+ """
+ Raised when an attempt to add transient (with no func_ref) job to a persistent job store is
+ detected.
+ """
+
+ def __init__(self, job_id):
+ super().__init__(
+ f"Job ({job_id}) cannot be added to this job store because a reference to the callable "
+ "could not be determined."
+ )
+
+
+class BaseJobStore(metaclass=ABCMeta):
+ """Abstract base class that defines the interface that every job store must implement."""
+
+ _scheduler = None
+ _alias = None
+ _logger = logging.getLogger("apscheduler.jobstores")
+
+ def start(self, scheduler, alias):
+ """
+ Called by the scheduler when the scheduler is being started or when the job store is being
+ added to an already running scheduler.
+
+ :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting
+ this job store
+ :param str|unicode alias: alias of this job store as it was assigned to the scheduler
+ """
+
+ self._scheduler = scheduler
+ self._alias = alias
+ self._logger = logging.getLogger(f"apscheduler.jobstores.{alias}")
+
+ def shutdown(self):
+ """Frees any resources still bound to this job store."""
+
+ def _fix_paused_jobs_sorting(self, jobs):
+ for i, job in enumerate(jobs):
+ if job.next_run_time is not None:
+ if i > 0:
+ paused_jobs = jobs[:i]
+ del jobs[:i]
+ jobs.extend(paused_jobs)
+ break
+
+ @abstractmethod
+ def lookup_job(self, job_id):
+ """
+ Returns a specific job, or ``None`` if it isn't found..
+
+ The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of
+ the returned job to point to the scheduler and itself, respectively.
+
+ :param str|unicode job_id: identifier of the job
+ :rtype: Job
+ """
+
+ @abstractmethod
+ def get_due_jobs(self, now):
+ """
+ Returns the list of jobs that have ``next_run_time`` earlier or equal to ``now``.
+ The returned jobs must be sorted by next run time (ascending).
+
+ :param datetime.datetime now: the current (timezone aware) datetime
+ :rtype: list[Job]
+ """
+
+ @abstractmethod
+ def get_next_run_time(self):
+ """
+ Returns the earliest run time of all the jobs stored in this job store, or ``None`` if
+ there are no active jobs.
+
+ :rtype: datetime.datetime
+ """
+
+ @abstractmethod
+ def get_all_jobs(self):
+ """
+ Returns a list of all jobs in this job store.
+ The returned jobs should be sorted by next run time (ascending).
+ Paused jobs (next_run_time == None) should be sorted last.
+
+ The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of
+ the returned jobs to point to the scheduler and itself, respectively.
+
+ :rtype: list[Job]
+ """
+
+ @abstractmethod
+ def add_job(self, job):
+ """
+ Adds the given job to this store.
+
+ :param Job job: the job to add
+ :raises ConflictingIdError: if there is another job in this store with the same ID
+ """
+
+ @abstractmethod
+ def update_job(self, job):
+ """
+ Replaces the job in the store with the given newer version.
+
+ :param Job job: the job to update
+ :raises JobLookupError: if the job does not exist
+ """
+
+ @abstractmethod
+ def remove_job(self, job_id):
+ """
+ Removes the given job from this store.
+
+ :param str|unicode job_id: identifier of the job
+ :raises JobLookupError: if the job does not exist
+ """
+
+ @abstractmethod
+ def remove_all_jobs(self):
+ """Removes all jobs from this store."""
+
+ def __repr__(self):
+ return f"<{self.__class__.__name__}>"
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py
new file mode 100644
index 00000000..3fe74ff1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py
@@ -0,0 +1,170 @@
+import pickle
+from datetime import datetime, timezone
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+ datetime_to_utc_timestamp,
+ maybe_ref,
+ utc_timestamp_to_datetime,
+)
+
+try:
+ from etcd3 import Etcd3Client
+except ImportError as exc: # pragma: nocover
+ raise ImportError("EtcdJobStore requires etcd3 be installed") from exc
+
+
+class EtcdJobStore(BaseJobStore):
+ """
+ Stores jobs in a etcd. Any leftover keyword arguments are directly passed to
+ etcd3's `etcd3.client
+ <https://python-etcd3.readthedocs.io/en/latest/readme.html>`_.
+
+ Plugin alias: ``etcd``
+
+ :param str path: path to store jobs in
+ :param client: a :class:`~etcd3.client.etcd3` instance to use instead of
+ providing connection arguments
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(
+ self,
+ path="/apscheduler",
+ client=None,
+ close_connection_on_exit=False,
+ pickle_protocol=pickle.DEFAULT_PROTOCOL,
+ **connect_args,
+ ):
+ super().__init__()
+ self.pickle_protocol = pickle_protocol
+ self.close_connection_on_exit = close_connection_on_exit
+
+ if not path:
+ raise ValueError('The "path" parameter must not be empty')
+
+ self.path = path
+
+ if client:
+ self.client = maybe_ref(client)
+ else:
+ self.client = Etcd3Client(**connect_args)
+
+ def lookup_job(self, job_id):
+ node_path = self.path + "/" + str(job_id)
+ try:
+ content, _ = self.client.get(node_path)
+ content = pickle.loads(content)
+ job = self._reconstitute_job(content["job_state"])
+ return job
+ except BaseException:
+ return None
+
+ def get_due_jobs(self, now):
+ timestamp = datetime_to_utc_timestamp(now)
+ jobs = [
+ job_record["job"]
+ for job_record in self._get_jobs()
+ if job_record["next_run_time"] is not None
+ and job_record["next_run_time"] <= timestamp
+ ]
+ return jobs
+
+ def get_next_run_time(self):
+ next_runs = [
+ job_record["next_run_time"]
+ for job_record in self._get_jobs()
+ if job_record["next_run_time"] is not None
+ ]
+ return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None
+
+ def get_all_jobs(self):
+ jobs = [job_record["job"] for job_record in self._get_jobs()]
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ node_path = self.path + "/" + str(job.id)
+ value = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": job.__getstate__(),
+ }
+ data = pickle.dumps(value, self.pickle_protocol)
+ status = self.client.put_if_not_exists(node_path, value=data)
+ if not status:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ node_path = self.path + "/" + str(job.id)
+ changes = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": job.__getstate__(),
+ }
+ data = pickle.dumps(changes, self.pickle_protocol)
+ status, _ = self.client.transaction(
+ compare=[self.client.transactions.version(node_path) > 0],
+ success=[self.client.transactions.put(node_path, value=data)],
+ failure=[],
+ )
+ if not status:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ node_path = self.path + "/" + str(job_id)
+ status, _ = self.client.transaction(
+ compare=[self.client.transactions.version(node_path) > 0],
+ success=[self.client.transactions.delete(node_path)],
+ failure=[],
+ )
+ if not status:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ self.client.delete_prefix(self.path)
+
+ def shutdown(self):
+ self.client.close()
+
+ def _reconstitute_job(self, job_state):
+ job_state = job_state
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self):
+ jobs = []
+ failed_job_ids = []
+ all_ids = list(self.client.get_prefix(self.path))
+
+ for doc, _ in all_ids:
+ try:
+ content = pickle.loads(doc)
+ job_record = {
+ "next_run_time": content["next_run_time"],
+ "job": self._reconstitute_job(content["job_state"]),
+ }
+ jobs.append(job_record)
+ except BaseException:
+ content = pickle.loads(doc)
+ failed_id = content["job_state"]["id"]
+ failed_job_ids.append(failed_id)
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', failed_id
+ )
+
+ if failed_job_ids:
+ for failed_id in failed_job_ids:
+ self.remove_job(failed_id)
+ paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc)
+ return sorted(
+ jobs,
+ key=lambda job_record: job_record["job"].next_run_time or paused_sort_key,
+ )
+
+ def __repr__(self):
+ self._logger.exception("<%s (client=%s)>", self.__class__.__name__, self.client)
+ return f"<{self.__class__.__name__} (client={self.client})>"
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py
new file mode 100644
index 00000000..8103cfdc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py
@@ -0,0 +1,106 @@
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import datetime_to_utc_timestamp
+
+
+class MemoryJobStore(BaseJobStore):
+ """
+ Stores jobs in an array in RAM. Provides no persistence support.
+
+ Plugin alias: ``memory``
+ """
+
+ def __init__(self):
+ super().__init__()
+ # list of (job, timestamp), sorted by next_run_time and job id (ascending)
+ self._jobs = []
+ self._jobs_index = {} # id -> (job, timestamp) lookup table
+
+ def lookup_job(self, job_id):
+ return self._jobs_index.get(job_id, (None, None))[0]
+
+ def get_due_jobs(self, now):
+ now_timestamp = datetime_to_utc_timestamp(now)
+ pending = []
+ for job, timestamp in self._jobs:
+ if timestamp is None or timestamp > now_timestamp:
+ break
+ pending.append(job)
+
+ return pending
+
+ def get_next_run_time(self):
+ return self._jobs[0][0].next_run_time if self._jobs else None
+
+ def get_all_jobs(self):
+ return [j[0] for j in self._jobs]
+
+ def add_job(self, job):
+ if job.id in self._jobs_index:
+ raise ConflictingIdError(job.id)
+
+ timestamp = datetime_to_utc_timestamp(job.next_run_time)
+ index = self._get_job_index(timestamp, job.id)
+ self._jobs.insert(index, (job, timestamp))
+ self._jobs_index[job.id] = (job, timestamp)
+
+ def update_job(self, job):
+ old_job, old_timestamp = self._jobs_index.get(job.id, (None, None))
+ if old_job is None:
+ raise JobLookupError(job.id)
+
+ # If the next run time has not changed, simply replace the job in its present index.
+ # Otherwise, reinsert the job to the list to preserve the ordering.
+ old_index = self._get_job_index(old_timestamp, old_job.id)
+ new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
+ if old_timestamp == new_timestamp:
+ self._jobs[old_index] = (job, new_timestamp)
+ else:
+ del self._jobs[old_index]
+ new_index = self._get_job_index(new_timestamp, job.id)
+ self._jobs.insert(new_index, (job, new_timestamp))
+
+ self._jobs_index[old_job.id] = (job, new_timestamp)
+
+ def remove_job(self, job_id):
+ job, timestamp = self._jobs_index.get(job_id, (None, None))
+ if job is None:
+ raise JobLookupError(job_id)
+
+ index = self._get_job_index(timestamp, job_id)
+ del self._jobs[index]
+ del self._jobs_index[job.id]
+
+ def remove_all_jobs(self):
+ self._jobs = []
+ self._jobs_index = {}
+
+ def shutdown(self):
+ self.remove_all_jobs()
+
+ def _get_job_index(self, timestamp, job_id):
+ """
+ Returns the index of the given job, or if it's not found, the index where the job should be
+ inserted based on the given timestamp.
+
+ :type timestamp: int
+ :type job_id: str
+
+ """
+ lo, hi = 0, len(self._jobs)
+ timestamp = float("inf") if timestamp is None else timestamp
+ while lo < hi:
+ mid = (lo + hi) // 2
+ mid_job, mid_timestamp = self._jobs[mid]
+ mid_timestamp = float("inf") if mid_timestamp is None else mid_timestamp
+ if mid_timestamp > timestamp:
+ hi = mid
+ elif mid_timestamp < timestamp:
+ lo = mid + 1
+ elif mid_job.id > job_id:
+ hi = mid
+ elif mid_job.id < job_id:
+ lo = mid + 1
+ else:
+ return mid
+
+ return lo
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py
new file mode 100644
index 00000000..102c0bd0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py
@@ -0,0 +1,158 @@
+import pickle
+import warnings
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+ datetime_to_utc_timestamp,
+ maybe_ref,
+ utc_timestamp_to_datetime,
+)
+
+try:
+ from bson.binary import Binary
+ from pymongo import ASCENDING, MongoClient
+ from pymongo.errors import DuplicateKeyError
+except ImportError as exc: # pragma: nocover
+ raise ImportError("MongoDBJobStore requires PyMongo installed") from exc
+
+
+class MongoDBJobStore(BaseJobStore):
+ """
+ Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to
+ pymongo's `MongoClient
+ <http://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient>`_.
+
+ Plugin alias: ``mongodb``
+
+ :param str database: database to store jobs in
+ :param str collection: collection to store jobs in
+ :param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of
+ providing connection arguments
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(
+ self,
+ database="apscheduler",
+ collection="jobs",
+ client=None,
+ pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ **connect_args,
+ ):
+ super().__init__()
+ self.pickle_protocol = pickle_protocol
+
+ if not database:
+ raise ValueError('The "database" parameter must not be empty')
+ if not collection:
+ raise ValueError('The "collection" parameter must not be empty')
+
+ if client:
+ self.client = maybe_ref(client)
+ else:
+ connect_args.setdefault("w", 1)
+ self.client = MongoClient(**connect_args)
+
+ self.collection = self.client[database][collection]
+
+ def start(self, scheduler, alias):
+ super().start(scheduler, alias)
+ self.collection.create_index("next_run_time", sparse=True)
+
+ @property
+ def connection(self):
+ warnings.warn(
+ 'The "connection" member is deprecated -- use "client" instead',
+ DeprecationWarning,
+ )
+ return self.client
+
+ def lookup_job(self, job_id):
+ document = self.collection.find_one(job_id, ["job_state"])
+ return self._reconstitute_job(document["job_state"]) if document else None
+
+ def get_due_jobs(self, now):
+ timestamp = datetime_to_utc_timestamp(now)
+ return self._get_jobs({"next_run_time": {"$lte": timestamp}})
+
+ def get_next_run_time(self):
+ document = self.collection.find_one(
+ {"next_run_time": {"$ne": None}},
+ projection=["next_run_time"],
+ sort=[("next_run_time", ASCENDING)],
+ )
+ return (
+ utc_timestamp_to_datetime(document["next_run_time"]) if document else None
+ )
+
+ def get_all_jobs(self):
+ jobs = self._get_jobs({})
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ try:
+ self.collection.insert_one(
+ {
+ "_id": job.id,
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": Binary(
+ pickle.dumps(job.__getstate__(), self.pickle_protocol)
+ ),
+ }
+ )
+ except DuplicateKeyError:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ changes = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)),
+ }
+ result = self.collection.update_one({"_id": job.id}, {"$set": changes})
+ if result and result.matched_count == 0:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ result = self.collection.delete_one({"_id": job_id})
+ if result and result.deleted_count == 0:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ self.collection.delete_many({})
+
+ def shutdown(self):
+ self.client.close()
+
+ def _reconstitute_job(self, job_state):
+ job_state = pickle.loads(job_state)
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self, conditions):
+ jobs = []
+ failed_job_ids = []
+ for document in self.collection.find(
+ conditions, ["_id", "job_state"], sort=[("next_run_time", ASCENDING)]
+ ):
+ try:
+ jobs.append(self._reconstitute_job(document["job_state"]))
+ except BaseException:
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', document["_id"]
+ )
+ failed_job_ids.append(document["_id"])
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ self.collection.delete_many({"_id": {"$in": failed_job_ids}})
+
+ return jobs
+
+ def __repr__(self):
+ return f"<{self.__class__.__name__} (client={self.client})>"
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py
new file mode 100644
index 00000000..528285fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py
@@ -0,0 +1,160 @@
+import pickle
+from datetime import datetime, timezone
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime
+
+try:
+ from redis import Redis
+except ImportError as exc: # pragma: nocover
+ raise ImportError("RedisJobStore requires redis installed") from exc
+
+
+class RedisJobStore(BaseJobStore):
+ """
+ Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's
+ :class:`~redis.StrictRedis`.
+
+ Plugin alias: ``redis``
+
+ :param int db: the database number to store jobs in
+ :param str jobs_key: key to store jobs in
+ :param str run_times_key: key to store the jobs' run times in
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(
+ self,
+ db=0,
+ jobs_key="apscheduler.jobs",
+ run_times_key="apscheduler.run_times",
+ pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ **connect_args,
+ ):
+ super().__init__()
+
+ if db is None:
+ raise ValueError('The "db" parameter must not be empty')
+ if not jobs_key:
+ raise ValueError('The "jobs_key" parameter must not be empty')
+ if not run_times_key:
+ raise ValueError('The "run_times_key" parameter must not be empty')
+
+ self.pickle_protocol = pickle_protocol
+ self.jobs_key = jobs_key
+ self.run_times_key = run_times_key
+ self.redis = Redis(db=int(db), **connect_args)
+
+ def lookup_job(self, job_id):
+ job_state = self.redis.hget(self.jobs_key, job_id)
+ return self._reconstitute_job(job_state) if job_state else None
+
+ def get_due_jobs(self, now):
+ timestamp = datetime_to_utc_timestamp(now)
+ job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)
+ if job_ids:
+ job_states = self.redis.hmget(self.jobs_key, *job_ids)
+ return self._reconstitute_jobs(zip(job_ids, job_states))
+ return []
+
+ def get_next_run_time(self):
+ next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True)
+ if next_run_time:
+ return utc_timestamp_to_datetime(next_run_time[0][1])
+
+ def get_all_jobs(self):
+ job_states = self.redis.hgetall(self.jobs_key)
+ jobs = self._reconstitute_jobs(job_states.items())
+ paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc)
+ return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key)
+
+ def add_job(self, job):
+ if self.redis.hexists(self.jobs_key, job.id):
+ raise ConflictingIdError(job.id)
+
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.hset(
+ self.jobs_key,
+ job.id,
+ pickle.dumps(job.__getstate__(), self.pickle_protocol),
+ )
+ if job.next_run_time:
+ pipe.zadd(
+ self.run_times_key,
+ {job.id: datetime_to_utc_timestamp(job.next_run_time)},
+ )
+
+ pipe.execute()
+
+ def update_job(self, job):
+ if not self.redis.hexists(self.jobs_key, job.id):
+ raise JobLookupError(job.id)
+
+ with self.redis.pipeline() as pipe:
+ pipe.hset(
+ self.jobs_key,
+ job.id,
+ pickle.dumps(job.__getstate__(), self.pickle_protocol),
+ )
+ if job.next_run_time:
+ pipe.zadd(
+ self.run_times_key,
+ {job.id: datetime_to_utc_timestamp(job.next_run_time)},
+ )
+ else:
+ pipe.zrem(self.run_times_key, job.id)
+
+ pipe.execute()
+
+ def remove_job(self, job_id):
+ if not self.redis.hexists(self.jobs_key, job_id):
+ raise JobLookupError(job_id)
+
+ with self.redis.pipeline() as pipe:
+ pipe.hdel(self.jobs_key, job_id)
+ pipe.zrem(self.run_times_key, job_id)
+ pipe.execute()
+
+ def remove_all_jobs(self):
+ with self.redis.pipeline() as pipe:
+ pipe.delete(self.jobs_key)
+ pipe.delete(self.run_times_key)
+ pipe.execute()
+
+ def shutdown(self):
+ self.redis.connection_pool.disconnect()
+
+ def _reconstitute_job(self, job_state):
+ job_state = pickle.loads(job_state)
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _reconstitute_jobs(self, job_states):
+ jobs = []
+ failed_job_ids = []
+ for job_id, job_state in job_states:
+ try:
+ jobs.append(self._reconstitute_job(job_state))
+ except BaseException:
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', job_id
+ )
+ failed_job_ids.append(job_id)
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ with self.redis.pipeline() as pipe:
+ pipe.hdel(self.jobs_key, *failed_job_ids)
+ pipe.zrem(self.run_times_key, *failed_job_ids)
+ pipe.execute()
+
+ return jobs
+
+ def __repr__(self):
+ return f"<{self.__class__.__name__}>"
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
new file mode 100644
index 00000000..d78290b1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
@@ -0,0 +1,173 @@
+import pickle
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+ datetime_to_utc_timestamp,
+ maybe_ref,
+ utc_timestamp_to_datetime,
+)
+
+try:
+ from rethinkdb import RethinkDB
+except ImportError as exc: # pragma: nocover
+ raise ImportError("RethinkDBJobStore requires rethinkdb installed") from exc
+
+
+class RethinkDBJobStore(BaseJobStore):
+ """
+ Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to
+ rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.
+
+ Plugin alias: ``rethinkdb``
+
+ :param str database: database to store jobs in
+ :param str collection: collection to store jobs in
+ :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing
+ connection arguments
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(
+ self,
+ database="apscheduler",
+ table="jobs",
+ client=None,
+ pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ **connect_args,
+ ):
+ super().__init__()
+
+ if not database:
+ raise ValueError('The "database" parameter must not be empty')
+ if not table:
+ raise ValueError('The "table" parameter must not be empty')
+
+ self.database = database
+ self.table_name = table
+ self.table = None
+ self.client = client
+ self.pickle_protocol = pickle_protocol
+ self.connect_args = connect_args
+ self.r = RethinkDB()
+ self.conn = None
+
+ def start(self, scheduler, alias):
+ super().start(scheduler, alias)
+
+ if self.client:
+ self.conn = maybe_ref(self.client)
+ else:
+ self.conn = self.r.connect(db=self.database, **self.connect_args)
+
+ if self.database not in self.r.db_list().run(self.conn):
+ self.r.db_create(self.database).run(self.conn)
+
+ if self.table_name not in self.r.table_list().run(self.conn):
+ self.r.table_create(self.table_name).run(self.conn)
+
+ if "next_run_time" not in self.r.table(self.table_name).index_list().run(
+ self.conn
+ ):
+ self.r.table(self.table_name).index_create("next_run_time").run(self.conn)
+
+ self.table = self.r.db(self.database).table(self.table_name)
+
+ def lookup_job(self, job_id):
+ results = list(self.table.get_all(job_id).pluck("job_state").run(self.conn))
+ return self._reconstitute_job(results[0]["job_state"]) if results else None
+
+ def get_due_jobs(self, now):
+ return self._get_jobs(
+ self.r.row["next_run_time"] <= datetime_to_utc_timestamp(now)
+ )
+
+ def get_next_run_time(self):
+ results = list(
+ self.table.filter(self.r.row["next_run_time"] != None)
+ .order_by(self.r.asc("next_run_time"))
+ .map(lambda x: x["next_run_time"])
+ .limit(1)
+ .run(self.conn)
+ )
+ return utc_timestamp_to_datetime(results[0]) if results else None
+
+ def get_all_jobs(self):
+ jobs = self._get_jobs()
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ job_dict = {
+ "id": job.id,
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": self.r.binary(
+ pickle.dumps(job.__getstate__(), self.pickle_protocol)
+ ),
+ }
+ results = self.table.insert(job_dict).run(self.conn)
+ if results["errors"] > 0:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ changes = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": self.r.binary(
+ pickle.dumps(job.__getstate__(), self.pickle_protocol)
+ ),
+ }
+ results = self.table.get_all(job.id).update(changes).run(self.conn)
+ skipped = False in map(lambda x: results[x] == 0, results.keys())
+ if results["skipped"] > 0 or results["errors"] > 0 or not skipped:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ results = self.table.get_all(job_id).delete().run(self.conn)
+ if results["deleted"] + results["skipped"] != 1:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ self.table.delete().run(self.conn)
+
+ def shutdown(self):
+ self.conn.close()
+
+ def _reconstitute_job(self, job_state):
+ job_state = pickle.loads(job_state)
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self, predicate=None):
+ jobs = []
+ failed_job_ids = []
+ query = (
+ self.table.filter(self.r.row["next_run_time"] != None).filter(predicate)
+ if predicate
+ else self.table
+ )
+ query = query.order_by("next_run_time", "id").pluck("id", "job_state")
+
+ for document in query.run(self.conn):
+ try:
+ jobs.append(self._reconstitute_job(document["job_state"]))
+ except Exception:
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', document["id"]
+ )
+ failed_job_ids.append(document["id"])
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ self.r.expr(failed_job_ids).for_each(
+ lambda job_id: self.table.get_all(job_id).delete()
+ ).run(self.conn)
+
+ return jobs
+
+ def __repr__(self):
+ connection = self.conn
+ return f"<{self.__class__.__name__} (connection={connection})>"
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py
new file mode 100644
index 00000000..9866acf5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py
@@ -0,0 +1,194 @@
+import pickle
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+ datetime_to_utc_timestamp,
+ maybe_ref,
+ utc_timestamp_to_datetime,
+)
+
+try:
+ from sqlalchemy import (
+ Column,
+ Float,
+ LargeBinary,
+ MetaData,
+ Table,
+ Unicode,
+ and_,
+ create_engine,
+ select,
+ )
+ from sqlalchemy.exc import IntegrityError
+ from sqlalchemy.sql.expression import null
+except ImportError as exc: # pragma: nocover
+ raise ImportError("SQLAlchemyJobStore requires SQLAlchemy installed") from exc
+
+
+class SQLAlchemyJobStore(BaseJobStore):
+ """
+ Stores jobs in a database table using SQLAlchemy.
+ The table will be created if it doesn't exist in the database.
+
+ Plugin alias: ``sqlalchemy``
+
+ :param str url: connection string (see
+ :ref:`SQLAlchemy documentation <sqlalchemy:database_urls>` on this)
+ :param engine: an SQLAlchemy :class:`~sqlalchemy.engine.Engine` to use instead of creating a
+ new one based on ``url``
+ :param str tablename: name of the table to store jobs in
+ :param metadata: a :class:`~sqlalchemy.schema.MetaData` instance to use instead of creating a
+ new one
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ :param str tableschema: name of the (existing) schema in the target database where the table
+ should be
+ :param dict engine_options: keyword arguments to :func:`~sqlalchemy.create_engine`
+ (ignored if ``engine`` is given)
+ """
+
+ def __init__(
+ self,
+ url=None,
+ engine=None,
+ tablename="apscheduler_jobs",
+ metadata=None,
+ pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ tableschema=None,
+ engine_options=None,
+ ):
+ super().__init__()
+ self.pickle_protocol = pickle_protocol
+ metadata = maybe_ref(metadata) or MetaData()
+
+ if engine:
+ self.engine = maybe_ref(engine)
+ elif url:
+ self.engine = create_engine(url, **(engine_options or {}))
+ else:
+ raise ValueError('Need either "engine" or "url" defined')
+
+ # 191 = max key length in MySQL for InnoDB/utf8mb4 tables,
+ # 25 = precision that translates to an 8-byte float
+ self.jobs_t = Table(
+ tablename,
+ metadata,
+ Column("id", Unicode(191), primary_key=True),
+ Column("next_run_time", Float(25), index=True),
+ Column("job_state", LargeBinary, nullable=False),
+ schema=tableschema,
+ )
+
+ def start(self, scheduler, alias):
+ super().start(scheduler, alias)
+ self.jobs_t.create(self.engine, True)
+
+ def lookup_job(self, job_id):
+ selectable = select(self.jobs_t.c.job_state).where(self.jobs_t.c.id == job_id)
+ with self.engine.begin() as connection:
+ job_state = connection.execute(selectable).scalar()
+ return self._reconstitute_job(job_state) if job_state else None
+
+ def get_due_jobs(self, now):
+ timestamp = datetime_to_utc_timestamp(now)
+ return self._get_jobs(self.jobs_t.c.next_run_time <= timestamp)
+
+ def get_next_run_time(self):
+ selectable = (
+ select(self.jobs_t.c.next_run_time)
+ .where(self.jobs_t.c.next_run_time != null())
+ .order_by(self.jobs_t.c.next_run_time)
+ .limit(1)
+ )
+ with self.engine.begin() as connection:
+ next_run_time = connection.execute(selectable).scalar()
+ return utc_timestamp_to_datetime(next_run_time)
+
+ def get_all_jobs(self):
+ jobs = self._get_jobs()
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ insert = self.jobs_t.insert().values(
+ **{
+ "id": job.id,
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": pickle.dumps(job.__getstate__(), self.pickle_protocol),
+ }
+ )
+ with self.engine.begin() as connection:
+ try:
+ connection.execute(insert)
+ except IntegrityError:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ update = (
+ self.jobs_t.update()
+ .values(
+ **{
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": pickle.dumps(job.__getstate__(), self.pickle_protocol),
+ }
+ )
+ .where(self.jobs_t.c.id == job.id)
+ )
+ with self.engine.begin() as connection:
+ result = connection.execute(update)
+ if result.rowcount == 0:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id)
+ with self.engine.begin() as connection:
+ result = connection.execute(delete)
+ if result.rowcount == 0:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ delete = self.jobs_t.delete()
+ with self.engine.begin() as connection:
+ connection.execute(delete)
+
+ def shutdown(self):
+ self.engine.dispose()
+
+ def _reconstitute_job(self, job_state):
+ job_state = pickle.loads(job_state)
+ job_state["jobstore"] = self
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self, *conditions):
+ jobs = []
+ selectable = select(self.jobs_t.c.id, self.jobs_t.c.job_state).order_by(
+ self.jobs_t.c.next_run_time
+ )
+ selectable = selectable.where(and_(*conditions)) if conditions else selectable
+ failed_job_ids = set()
+ with self.engine.begin() as connection:
+ for row in connection.execute(selectable):
+ try:
+ jobs.append(self._reconstitute_job(row.job_state))
+ except BaseException:
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', row.id
+ )
+ failed_job_ids.add(row.id)
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ delete = self.jobs_t.delete().where(
+ self.jobs_t.c.id.in_(failed_job_ids)
+ )
+ connection.execute(delete)
+
+ return jobs
+
+ def __repr__(self):
+ return f"<{self.__class__.__name__} (url={self.engine.url})>"
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py
new file mode 100644
index 00000000..687fbc2a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py
@@ -0,0 +1,197 @@
+import pickle
+from datetime import datetime, timezone
+
+from kazoo.exceptions import NodeExistsError, NoNodeError
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+ datetime_to_utc_timestamp,
+ maybe_ref,
+ utc_timestamp_to_datetime,
+)
+
+try:
+ from kazoo.client import KazooClient
+except ImportError as exc: # pragma: nocover
+ raise ImportError("ZooKeeperJobStore requires Kazoo installed") from exc
+
+
+class ZooKeeperJobStore(BaseJobStore):
+ """
+ Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to
+ kazoo's `KazooClient
+ <http://kazoo.readthedocs.io/en/latest/api/client.html>`_.
+
+ Plugin alias: ``zookeeper``
+
+ :param str path: path to store jobs in
+ :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of
+ providing connection arguments
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(
+ self,
+ path="/apscheduler",
+ client=None,
+ close_connection_on_exit=False,
+ pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ **connect_args,
+ ):
+ super().__init__()
+ self.pickle_protocol = pickle_protocol
+ self.close_connection_on_exit = close_connection_on_exit
+
+ if not path:
+ raise ValueError('The "path" parameter must not be empty')
+
+ self.path = path
+
+ if client:
+ self.client = maybe_ref(client)
+ else:
+ self.client = KazooClient(**connect_args)
+ self._ensured_path = False
+
+ def _ensure_paths(self):
+ if not self._ensured_path:
+ self.client.ensure_path(self.path)
+ self._ensured_path = True
+
+ def start(self, scheduler, alias):
+ super().start(scheduler, alias)
+ if not self.client.connected:
+ self.client.start()
+
+ def lookup_job(self, job_id):
+ self._ensure_paths()
+ node_path = self.path + "/" + str(job_id)
+ try:
+ content, _ = self.client.get(node_path)
+ doc = pickle.loads(content)
+ job = self._reconstitute_job(doc["job_state"])
+ return job
+ except BaseException:
+ return None
+
+ def get_due_jobs(self, now):
+ timestamp = datetime_to_utc_timestamp(now)
+ jobs = [
+ job_def["job"]
+ for job_def in self._get_jobs()
+ if job_def["next_run_time"] is not None
+ and job_def["next_run_time"] <= timestamp
+ ]
+ return jobs
+
+ def get_next_run_time(self):
+ next_runs = [
+ job_def["next_run_time"]
+ for job_def in self._get_jobs()
+ if job_def["next_run_time"] is not None
+ ]
+ return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None
+
+ def get_all_jobs(self):
+ jobs = [job_def["job"] for job_def in self._get_jobs()]
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ self._ensure_paths()
+ node_path = self.path + "/" + str(job.id)
+ value = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": job.__getstate__(),
+ }
+ data = pickle.dumps(value, self.pickle_protocol)
+ try:
+ self.client.create(node_path, value=data)
+ except NodeExistsError:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ self._ensure_paths()
+ node_path = self.path + "/" + str(job.id)
+ changes = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": job.__getstate__(),
+ }
+ data = pickle.dumps(changes, self.pickle_protocol)
+ try:
+ self.client.set(node_path, value=data)
+ except NoNodeError:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ self._ensure_paths()
+ node_path = self.path + "/" + str(job_id)
+ try:
+ self.client.delete(node_path)
+ except NoNodeError:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ try:
+ self.client.delete(self.path, recursive=True)
+ except NoNodeError:
+ pass
+ self._ensured_path = False
+
+ def shutdown(self):
+ if self.close_connection_on_exit:
+ self.client.stop()
+ self.client.close()
+
+ def _reconstitute_job(self, job_state):
+ job_state = job_state
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self):
+ self._ensure_paths()
+ jobs = []
+ failed_job_ids = []
+ all_ids = self.client.get_children(self.path)
+ for node_name in all_ids:
+ try:
+ node_path = self.path + "/" + node_name
+ content, _ = self.client.get(node_path)
+ doc = pickle.loads(content)
+ job_def = {
+ "job_id": node_name,
+ "next_run_time": doc["next_run_time"]
+ if doc["next_run_time"]
+ else None,
+ "job_state": doc["job_state"],
+ "job": self._reconstitute_job(doc["job_state"]),
+ "creation_time": _.ctime,
+ }
+ jobs.append(job_def)
+ except BaseException:
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', node_name
+ )
+ failed_job_ids.append(node_name)
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ for failed_id in failed_job_ids:
+ self.remove_job(failed_id)
+ paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc)
+ return sorted(
+ jobs,
+ key=lambda job_def: (
+ job_def["job"].next_run_time or paused_sort_key,
+ job_def["creation_time"],
+ ),
+ )
+
+ def __repr__(self):
+ self._logger.exception("<%s (client=%s)>", self.__class__.__name__, self.client)
+ return f"<{self.__class__.__name__} (client={self.client})>"