diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/apscheduler/jobstores | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/jobstores')
9 files changed, 1299 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/__init__.py diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py new file mode 100644 index 00000000..01cabd1e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/base.py @@ -0,0 +1,141 @@ +import logging +from abc import ABCMeta, abstractmethod + + +class JobLookupError(KeyError): + """Raised when the job store cannot find a job for update or removal.""" + + def __init__(self, job_id): + super().__init__(f"No job by the id of {job_id} was found") + + +class ConflictingIdError(KeyError): + """Raised when the uniqueness of job IDs is being violated.""" + + def __init__(self, job_id): + super().__init__(f"Job identifier ({job_id}) conflicts with an existing job") + + +class TransientJobError(ValueError): + """ + Raised when an attempt to add transient (with no func_ref) job to a persistent job store is + detected. + """ + + def __init__(self, job_id): + super().__init__( + f"Job ({job_id}) cannot be added to this job store because a reference to the callable " + "could not be determined." + ) + + +class BaseJobStore(metaclass=ABCMeta): + """Abstract base class that defines the interface that every job store must implement.""" + + _scheduler = None + _alias = None + _logger = logging.getLogger("apscheduler.jobstores") + + def start(self, scheduler, alias): + """ + Called by the scheduler when the scheduler is being started or when the job store is being + added to an already running scheduler. + + :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting + this job store + :param str|unicode alias: alias of this job store as it was assigned to the scheduler + """ + + self._scheduler = scheduler + self._alias = alias + self._logger = logging.getLogger(f"apscheduler.jobstores.{alias}") + + def shutdown(self): + """Frees any resources still bound to this job store.""" + + def _fix_paused_jobs_sorting(self, jobs): + for i, job in enumerate(jobs): + if job.next_run_time is not None: + if i > 0: + paused_jobs = jobs[:i] + del jobs[:i] + jobs.extend(paused_jobs) + break + + @abstractmethod + def lookup_job(self, job_id): + """ + Returns a specific job, or ``None`` if it isn't found.. + + The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of + the returned job to point to the scheduler and itself, respectively. + + :param str|unicode job_id: identifier of the job + :rtype: Job + """ + + @abstractmethod + def get_due_jobs(self, now): + """ + Returns the list of jobs that have ``next_run_time`` earlier or equal to ``now``. + The returned jobs must be sorted by next run time (ascending). + + :param datetime.datetime now: the current (timezone aware) datetime + :rtype: list[Job] + """ + + @abstractmethod + def get_next_run_time(self): + """ + Returns the earliest run time of all the jobs stored in this job store, or ``None`` if + there are no active jobs. + + :rtype: datetime.datetime + """ + + @abstractmethod + def get_all_jobs(self): + """ + Returns a list of all jobs in this job store. + The returned jobs should be sorted by next run time (ascending). + Paused jobs (next_run_time == None) should be sorted last. + + The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of + the returned jobs to point to the scheduler and itself, respectively. + + :rtype: list[Job] + """ + + @abstractmethod + def add_job(self, job): + """ + Adds the given job to this store. + + :param Job job: the job to add + :raises ConflictingIdError: if there is another job in this store with the same ID + """ + + @abstractmethod + def update_job(self, job): + """ + Replaces the job in the store with the given newer version. + + :param Job job: the job to update + :raises JobLookupError: if the job does not exist + """ + + @abstractmethod + def remove_job(self, job_id): + """ + Removes the given job from this store. + + :param str|unicode job_id: identifier of the job + :raises JobLookupError: if the job does not exist + """ + + @abstractmethod + def remove_all_jobs(self): + """Removes all jobs from this store.""" + + def __repr__(self): + return f"<{self.__class__.__name__}>" diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py new file mode 100644 index 00000000..3fe74ff1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/etcd.py @@ -0,0 +1,170 @@ +import pickle +from datetime import datetime, timezone + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import ( + datetime_to_utc_timestamp, + maybe_ref, + utc_timestamp_to_datetime, +) + +try: + from etcd3 import Etcd3Client +except ImportError as exc: # pragma: nocover + raise ImportError("EtcdJobStore requires etcd3 be installed") from exc + + +class EtcdJobStore(BaseJobStore): + """ + Stores jobs in a etcd. Any leftover keyword arguments are directly passed to + etcd3's `etcd3.client + <https://python-etcd3.readthedocs.io/en/latest/readme.html>`_. + + Plugin alias: ``etcd`` + + :param str path: path to store jobs in + :param client: a :class:`~etcd3.client.etcd3` instance to use instead of + providing connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__( + self, + path="/apscheduler", + client=None, + close_connection_on_exit=False, + pickle_protocol=pickle.DEFAULT_PROTOCOL, + **connect_args, + ): + super().__init__() + self.pickle_protocol = pickle_protocol + self.close_connection_on_exit = close_connection_on_exit + + if not path: + raise ValueError('The "path" parameter must not be empty') + + self.path = path + + if client: + self.client = maybe_ref(client) + else: + self.client = Etcd3Client(**connect_args) + + def lookup_job(self, job_id): + node_path = self.path + "/" + str(job_id) + try: + content, _ = self.client.get(node_path) + content = pickle.loads(content) + job = self._reconstitute_job(content["job_state"]) + return job + except BaseException: + return None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + jobs = [ + job_record["job"] + for job_record in self._get_jobs() + if job_record["next_run_time"] is not None + and job_record["next_run_time"] <= timestamp + ] + return jobs + + def get_next_run_time(self): + next_runs = [ + job_record["next_run_time"] + for job_record in self._get_jobs() + if job_record["next_run_time"] is not None + ] + return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None + + def get_all_jobs(self): + jobs = [job_record["job"] for job_record in self._get_jobs()] + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + node_path = self.path + "/" + str(job.id) + value = { + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": job.__getstate__(), + } + data = pickle.dumps(value, self.pickle_protocol) + status = self.client.put_if_not_exists(node_path, value=data) + if not status: + raise ConflictingIdError(job.id) + + def update_job(self, job): + node_path = self.path + "/" + str(job.id) + changes = { + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": job.__getstate__(), + } + data = pickle.dumps(changes, self.pickle_protocol) + status, _ = self.client.transaction( + compare=[self.client.transactions.version(node_path) > 0], + success=[self.client.transactions.put(node_path, value=data)], + failure=[], + ) + if not status: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + node_path = self.path + "/" + str(job_id) + status, _ = self.client.transaction( + compare=[self.client.transactions.version(node_path) > 0], + success=[self.client.transactions.delete(node_path)], + failure=[], + ) + if not status: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + self.client.delete_prefix(self.path) + + def shutdown(self): + self.client.close() + + def _reconstitute_job(self, job_state): + job_state = job_state + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self): + jobs = [] + failed_job_ids = [] + all_ids = list(self.client.get_prefix(self.path)) + + for doc, _ in all_ids: + try: + content = pickle.loads(doc) + job_record = { + "next_run_time": content["next_run_time"], + "job": self._reconstitute_job(content["job_state"]), + } + jobs.append(job_record) + except BaseException: + content = pickle.loads(doc) + failed_id = content["job_state"]["id"] + failed_job_ids.append(failed_id) + self._logger.exception( + 'Unable to restore job "%s" -- removing it', failed_id + ) + + if failed_job_ids: + for failed_id in failed_job_ids: + self.remove_job(failed_id) + paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc) + return sorted( + jobs, + key=lambda job_record: job_record["job"].next_run_time or paused_sort_key, + ) + + def __repr__(self): + self._logger.exception("<%s (client=%s)>", self.__class__.__name__, self.client) + return f"<{self.__class__.__name__} (client={self.client})>" diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py new file mode 100644 index 00000000..8103cfdc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/memory.py @@ -0,0 +1,106 @@ +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import datetime_to_utc_timestamp + + +class MemoryJobStore(BaseJobStore): + """ + Stores jobs in an array in RAM. Provides no persistence support. + + Plugin alias: ``memory`` + """ + + def __init__(self): + super().__init__() + # list of (job, timestamp), sorted by next_run_time and job id (ascending) + self._jobs = [] + self._jobs_index = {} # id -> (job, timestamp) lookup table + + def lookup_job(self, job_id): + return self._jobs_index.get(job_id, (None, None))[0] + + def get_due_jobs(self, now): + now_timestamp = datetime_to_utc_timestamp(now) + pending = [] + for job, timestamp in self._jobs: + if timestamp is None or timestamp > now_timestamp: + break + pending.append(job) + + return pending + + def get_next_run_time(self): + return self._jobs[0][0].next_run_time if self._jobs else None + + def get_all_jobs(self): + return [j[0] for j in self._jobs] + + def add_job(self, job): + if job.id in self._jobs_index: + raise ConflictingIdError(job.id) + + timestamp = datetime_to_utc_timestamp(job.next_run_time) + index = self._get_job_index(timestamp, job.id) + self._jobs.insert(index, (job, timestamp)) + self._jobs_index[job.id] = (job, timestamp) + + def update_job(self, job): + old_job, old_timestamp = self._jobs_index.get(job.id, (None, None)) + if old_job is None: + raise JobLookupError(job.id) + + # If the next run time has not changed, simply replace the job in its present index. + # Otherwise, reinsert the job to the list to preserve the ordering. + old_index = self._get_job_index(old_timestamp, old_job.id) + new_timestamp = datetime_to_utc_timestamp(job.next_run_time) + if old_timestamp == new_timestamp: + self._jobs[old_index] = (job, new_timestamp) + else: + del self._jobs[old_index] + new_index = self._get_job_index(new_timestamp, job.id) + self._jobs.insert(new_index, (job, new_timestamp)) + + self._jobs_index[old_job.id] = (job, new_timestamp) + + def remove_job(self, job_id): + job, timestamp = self._jobs_index.get(job_id, (None, None)) + if job is None: + raise JobLookupError(job_id) + + index = self._get_job_index(timestamp, job_id) + del self._jobs[index] + del self._jobs_index[job.id] + + def remove_all_jobs(self): + self._jobs = [] + self._jobs_index = {} + + def shutdown(self): + self.remove_all_jobs() + + def _get_job_index(self, timestamp, job_id): + """ + Returns the index of the given job, or if it's not found, the index where the job should be + inserted based on the given timestamp. + + :type timestamp: int + :type job_id: str + + """ + lo, hi = 0, len(self._jobs) + timestamp = float("inf") if timestamp is None else timestamp + while lo < hi: + mid = (lo + hi) // 2 + mid_job, mid_timestamp = self._jobs[mid] + mid_timestamp = float("inf") if mid_timestamp is None else mid_timestamp + if mid_timestamp > timestamp: + hi = mid + elif mid_timestamp < timestamp: + lo = mid + 1 + elif mid_job.id > job_id: + hi = mid + elif mid_job.id < job_id: + lo = mid + 1 + else: + return mid + + return lo diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py new file mode 100644 index 00000000..102c0bd0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/mongodb.py @@ -0,0 +1,158 @@ +import pickle +import warnings + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import ( + datetime_to_utc_timestamp, + maybe_ref, + utc_timestamp_to_datetime, +) + +try: + from bson.binary import Binary + from pymongo import ASCENDING, MongoClient + from pymongo.errors import DuplicateKeyError +except ImportError as exc: # pragma: nocover + raise ImportError("MongoDBJobStore requires PyMongo installed") from exc + + +class MongoDBJobStore(BaseJobStore): + """ + Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to + pymongo's `MongoClient + <http://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient>`_. + + Plugin alias: ``mongodb`` + + :param str database: database to store jobs in + :param str collection: collection to store jobs in + :param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of + providing connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__( + self, + database="apscheduler", + collection="jobs", + client=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args, + ): + super().__init__() + self.pickle_protocol = pickle_protocol + + if not database: + raise ValueError('The "database" parameter must not be empty') + if not collection: + raise ValueError('The "collection" parameter must not be empty') + + if client: + self.client = maybe_ref(client) + else: + connect_args.setdefault("w", 1) + self.client = MongoClient(**connect_args) + + self.collection = self.client[database][collection] + + def start(self, scheduler, alias): + super().start(scheduler, alias) + self.collection.create_index("next_run_time", sparse=True) + + @property + def connection(self): + warnings.warn( + 'The "connection" member is deprecated -- use "client" instead', + DeprecationWarning, + ) + return self.client + + def lookup_job(self, job_id): + document = self.collection.find_one(job_id, ["job_state"]) + return self._reconstitute_job(document["job_state"]) if document else None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + return self._get_jobs({"next_run_time": {"$lte": timestamp}}) + + def get_next_run_time(self): + document = self.collection.find_one( + {"next_run_time": {"$ne": None}}, + projection=["next_run_time"], + sort=[("next_run_time", ASCENDING)], + ) + return ( + utc_timestamp_to_datetime(document["next_run_time"]) if document else None + ) + + def get_all_jobs(self): + jobs = self._get_jobs({}) + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + try: + self.collection.insert_one( + { + "_id": job.id, + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": Binary( + pickle.dumps(job.__getstate__(), self.pickle_protocol) + ), + } + ) + except DuplicateKeyError: + raise ConflictingIdError(job.id) + + def update_job(self, job): + changes = { + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)), + } + result = self.collection.update_one({"_id": job.id}, {"$set": changes}) + if result and result.matched_count == 0: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + result = self.collection.delete_one({"_id": job_id}) + if result and result.deleted_count == 0: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + self.collection.delete_many({}) + + def shutdown(self): + self.client.close() + + def _reconstitute_job(self, job_state): + job_state = pickle.loads(job_state) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self, conditions): + jobs = [] + failed_job_ids = [] + for document in self.collection.find( + conditions, ["_id", "job_state"], sort=[("next_run_time", ASCENDING)] + ): + try: + jobs.append(self._reconstitute_job(document["job_state"])) + except BaseException: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', document["_id"] + ) + failed_job_ids.append(document["_id"]) + + # Remove all the jobs we failed to restore + if failed_job_ids: + self.collection.delete_many({"_id": {"$in": failed_job_ids}}) + + return jobs + + def __repr__(self): + return f"<{self.__class__.__name__} (client={self.client})>" diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py new file mode 100644 index 00000000..528285fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py @@ -0,0 +1,160 @@ +import pickle +from datetime import datetime, timezone + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime + +try: + from redis import Redis +except ImportError as exc: # pragma: nocover + raise ImportError("RedisJobStore requires redis installed") from exc + + +class RedisJobStore(BaseJobStore): + """ + Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's + :class:`~redis.StrictRedis`. + + Plugin alias: ``redis`` + + :param int db: the database number to store jobs in + :param str jobs_key: key to store jobs in + :param str run_times_key: key to store the jobs' run times in + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__( + self, + db=0, + jobs_key="apscheduler.jobs", + run_times_key="apscheduler.run_times", + pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args, + ): + super().__init__() + + if db is None: + raise ValueError('The "db" parameter must not be empty') + if not jobs_key: + raise ValueError('The "jobs_key" parameter must not be empty') + if not run_times_key: + raise ValueError('The "run_times_key" parameter must not be empty') + + self.pickle_protocol = pickle_protocol + self.jobs_key = jobs_key + self.run_times_key = run_times_key + self.redis = Redis(db=int(db), **connect_args) + + def lookup_job(self, job_id): + job_state = self.redis.hget(self.jobs_key, job_id) + return self._reconstitute_job(job_state) if job_state else None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp) + if job_ids: + job_states = self.redis.hmget(self.jobs_key, *job_ids) + return self._reconstitute_jobs(zip(job_ids, job_states)) + return [] + + def get_next_run_time(self): + next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True) + if next_run_time: + return utc_timestamp_to_datetime(next_run_time[0][1]) + + def get_all_jobs(self): + job_states = self.redis.hgetall(self.jobs_key) + jobs = self._reconstitute_jobs(job_states.items()) + paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc) + return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key) + + def add_job(self, job): + if self.redis.hexists(self.jobs_key, job.id): + raise ConflictingIdError(job.id) + + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.hset( + self.jobs_key, + job.id, + pickle.dumps(job.__getstate__(), self.pickle_protocol), + ) + if job.next_run_time: + pipe.zadd( + self.run_times_key, + {job.id: datetime_to_utc_timestamp(job.next_run_time)}, + ) + + pipe.execute() + + def update_job(self, job): + if not self.redis.hexists(self.jobs_key, job.id): + raise JobLookupError(job.id) + + with self.redis.pipeline() as pipe: + pipe.hset( + self.jobs_key, + job.id, + pickle.dumps(job.__getstate__(), self.pickle_protocol), + ) + if job.next_run_time: + pipe.zadd( + self.run_times_key, + {job.id: datetime_to_utc_timestamp(job.next_run_time)}, + ) + else: + pipe.zrem(self.run_times_key, job.id) + + pipe.execute() + + def remove_job(self, job_id): + if not self.redis.hexists(self.jobs_key, job_id): + raise JobLookupError(job_id) + + with self.redis.pipeline() as pipe: + pipe.hdel(self.jobs_key, job_id) + pipe.zrem(self.run_times_key, job_id) + pipe.execute() + + def remove_all_jobs(self): + with self.redis.pipeline() as pipe: + pipe.delete(self.jobs_key) + pipe.delete(self.run_times_key) + pipe.execute() + + def shutdown(self): + self.redis.connection_pool.disconnect() + + def _reconstitute_job(self, job_state): + job_state = pickle.loads(job_state) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _reconstitute_jobs(self, job_states): + jobs = [] + failed_job_ids = [] + for job_id, job_state in job_states: + try: + jobs.append(self._reconstitute_job(job_state)) + except BaseException: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', job_id + ) + failed_job_ids.append(job_id) + + # Remove all the jobs we failed to restore + if failed_job_ids: + with self.redis.pipeline() as pipe: + pipe.hdel(self.jobs_key, *failed_job_ids) + pipe.zrem(self.run_times_key, *failed_job_ids) + pipe.execute() + + return jobs + + def __repr__(self): + return f"<{self.__class__.__name__}>" diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py new file mode 100644 index 00000000..d78290b1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py @@ -0,0 +1,173 @@ +import pickle + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import ( + datetime_to_utc_timestamp, + maybe_ref, + utc_timestamp_to_datetime, +) + +try: + from rethinkdb import RethinkDB +except ImportError as exc: # pragma: nocover + raise ImportError("RethinkDBJobStore requires rethinkdb installed") from exc + + +class RethinkDBJobStore(BaseJobStore): + """ + Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to + rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_. + + Plugin alias: ``rethinkdb`` + + :param str database: database to store jobs in + :param str collection: collection to store jobs in + :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing + connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__( + self, + database="apscheduler", + table="jobs", + client=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args, + ): + super().__init__() + + if not database: + raise ValueError('The "database" parameter must not be empty') + if not table: + raise ValueError('The "table" parameter must not be empty') + + self.database = database + self.table_name = table + self.table = None + self.client = client + self.pickle_protocol = pickle_protocol + self.connect_args = connect_args + self.r = RethinkDB() + self.conn = None + + def start(self, scheduler, alias): + super().start(scheduler, alias) + + if self.client: + self.conn = maybe_ref(self.client) + else: + self.conn = self.r.connect(db=self.database, **self.connect_args) + + if self.database not in self.r.db_list().run(self.conn): + self.r.db_create(self.database).run(self.conn) + + if self.table_name not in self.r.table_list().run(self.conn): + self.r.table_create(self.table_name).run(self.conn) + + if "next_run_time" not in self.r.table(self.table_name).index_list().run( + self.conn + ): + self.r.table(self.table_name).index_create("next_run_time").run(self.conn) + + self.table = self.r.db(self.database).table(self.table_name) + + def lookup_job(self, job_id): + results = list(self.table.get_all(job_id).pluck("job_state").run(self.conn)) + return self._reconstitute_job(results[0]["job_state"]) if results else None + + def get_due_jobs(self, now): + return self._get_jobs( + self.r.row["next_run_time"] <= datetime_to_utc_timestamp(now) + ) + + def get_next_run_time(self): + results = list( + self.table.filter(self.r.row["next_run_time"] != None) + .order_by(self.r.asc("next_run_time")) + .map(lambda x: x["next_run_time"]) + .limit(1) + .run(self.conn) + ) + return utc_timestamp_to_datetime(results[0]) if results else None + + def get_all_jobs(self): + jobs = self._get_jobs() + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + job_dict = { + "id": job.id, + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": self.r.binary( + pickle.dumps(job.__getstate__(), self.pickle_protocol) + ), + } + results = self.table.insert(job_dict).run(self.conn) + if results["errors"] > 0: + raise ConflictingIdError(job.id) + + def update_job(self, job): + changes = { + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": self.r.binary( + pickle.dumps(job.__getstate__(), self.pickle_protocol) + ), + } + results = self.table.get_all(job.id).update(changes).run(self.conn) + skipped = False in map(lambda x: results[x] == 0, results.keys()) + if results["skipped"] > 0 or results["errors"] > 0 or not skipped: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + results = self.table.get_all(job_id).delete().run(self.conn) + if results["deleted"] + results["skipped"] != 1: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + self.table.delete().run(self.conn) + + def shutdown(self): + self.conn.close() + + def _reconstitute_job(self, job_state): + job_state = pickle.loads(job_state) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self, predicate=None): + jobs = [] + failed_job_ids = [] + query = ( + self.table.filter(self.r.row["next_run_time"] != None).filter(predicate) + if predicate + else self.table + ) + query = query.order_by("next_run_time", "id").pluck("id", "job_state") + + for document in query.run(self.conn): + try: + jobs.append(self._reconstitute_job(document["job_state"])) + except Exception: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', document["id"] + ) + failed_job_ids.append(document["id"]) + + # Remove all the jobs we failed to restore + if failed_job_ids: + self.r.expr(failed_job_ids).for_each( + lambda job_id: self.table.get_all(job_id).delete() + ).run(self.conn) + + return jobs + + def __repr__(self): + connection = self.conn + return f"<{self.__class__.__name__} (connection={connection})>" diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py new file mode 100644 index 00000000..9866acf5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/sqlalchemy.py @@ -0,0 +1,194 @@ +import pickle + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import ( + datetime_to_utc_timestamp, + maybe_ref, + utc_timestamp_to_datetime, +) + +try: + from sqlalchemy import ( + Column, + Float, + LargeBinary, + MetaData, + Table, + Unicode, + and_, + create_engine, + select, + ) + from sqlalchemy.exc import IntegrityError + from sqlalchemy.sql.expression import null +except ImportError as exc: # pragma: nocover + raise ImportError("SQLAlchemyJobStore requires SQLAlchemy installed") from exc + + +class SQLAlchemyJobStore(BaseJobStore): + """ + Stores jobs in a database table using SQLAlchemy. + The table will be created if it doesn't exist in the database. + + Plugin alias: ``sqlalchemy`` + + :param str url: connection string (see + :ref:`SQLAlchemy documentation <sqlalchemy:database_urls>` on this) + :param engine: an SQLAlchemy :class:`~sqlalchemy.engine.Engine` to use instead of creating a + new one based on ``url`` + :param str tablename: name of the table to store jobs in + :param metadata: a :class:`~sqlalchemy.schema.MetaData` instance to use instead of creating a + new one + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + :param str tableschema: name of the (existing) schema in the target database where the table + should be + :param dict engine_options: keyword arguments to :func:`~sqlalchemy.create_engine` + (ignored if ``engine`` is given) + """ + + def __init__( + self, + url=None, + engine=None, + tablename="apscheduler_jobs", + metadata=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, + tableschema=None, + engine_options=None, + ): + super().__init__() + self.pickle_protocol = pickle_protocol + metadata = maybe_ref(metadata) or MetaData() + + if engine: + self.engine = maybe_ref(engine) + elif url: + self.engine = create_engine(url, **(engine_options or {})) + else: + raise ValueError('Need either "engine" or "url" defined') + + # 191 = max key length in MySQL for InnoDB/utf8mb4 tables, + # 25 = precision that translates to an 8-byte float + self.jobs_t = Table( + tablename, + metadata, + Column("id", Unicode(191), primary_key=True), + Column("next_run_time", Float(25), index=True), + Column("job_state", LargeBinary, nullable=False), + schema=tableschema, + ) + + def start(self, scheduler, alias): + super().start(scheduler, alias) + self.jobs_t.create(self.engine, True) + + def lookup_job(self, job_id): + selectable = select(self.jobs_t.c.job_state).where(self.jobs_t.c.id == job_id) + with self.engine.begin() as connection: + job_state = connection.execute(selectable).scalar() + return self._reconstitute_job(job_state) if job_state else None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + return self._get_jobs(self.jobs_t.c.next_run_time <= timestamp) + + def get_next_run_time(self): + selectable = ( + select(self.jobs_t.c.next_run_time) + .where(self.jobs_t.c.next_run_time != null()) + .order_by(self.jobs_t.c.next_run_time) + .limit(1) + ) + with self.engine.begin() as connection: + next_run_time = connection.execute(selectable).scalar() + return utc_timestamp_to_datetime(next_run_time) + + def get_all_jobs(self): + jobs = self._get_jobs() + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + insert = self.jobs_t.insert().values( + **{ + "id": job.id, + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": pickle.dumps(job.__getstate__(), self.pickle_protocol), + } + ) + with self.engine.begin() as connection: + try: + connection.execute(insert) + except IntegrityError: + raise ConflictingIdError(job.id) + + def update_job(self, job): + update = ( + self.jobs_t.update() + .values( + **{ + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": pickle.dumps(job.__getstate__(), self.pickle_protocol), + } + ) + .where(self.jobs_t.c.id == job.id) + ) + with self.engine.begin() as connection: + result = connection.execute(update) + if result.rowcount == 0: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id) + with self.engine.begin() as connection: + result = connection.execute(delete) + if result.rowcount == 0: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + delete = self.jobs_t.delete() + with self.engine.begin() as connection: + connection.execute(delete) + + def shutdown(self): + self.engine.dispose() + + def _reconstitute_job(self, job_state): + job_state = pickle.loads(job_state) + job_state["jobstore"] = self + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self, *conditions): + jobs = [] + selectable = select(self.jobs_t.c.id, self.jobs_t.c.job_state).order_by( + self.jobs_t.c.next_run_time + ) + selectable = selectable.where(and_(*conditions)) if conditions else selectable + failed_job_ids = set() + with self.engine.begin() as connection: + for row in connection.execute(selectable): + try: + jobs.append(self._reconstitute_job(row.job_state)) + except BaseException: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', row.id + ) + failed_job_ids.add(row.id) + + # Remove all the jobs we failed to restore + if failed_job_ids: + delete = self.jobs_t.delete().where( + self.jobs_t.c.id.in_(failed_job_ids) + ) + connection.execute(delete) + + return jobs + + def __repr__(self): + return f"<{self.__class__.__name__} (url={self.engine.url})>" diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py new file mode 100644 index 00000000..687fbc2a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/zookeeper.py @@ -0,0 +1,197 @@ +import pickle +from datetime import datetime, timezone + +from kazoo.exceptions import NodeExistsError, NoNodeError + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import ( + datetime_to_utc_timestamp, + maybe_ref, + utc_timestamp_to_datetime, +) + +try: + from kazoo.client import KazooClient +except ImportError as exc: # pragma: nocover + raise ImportError("ZooKeeperJobStore requires Kazoo installed") from exc + + +class ZooKeeperJobStore(BaseJobStore): + """ + Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to + kazoo's `KazooClient + <http://kazoo.readthedocs.io/en/latest/api/client.html>`_. + + Plugin alias: ``zookeeper`` + + :param str path: path to store jobs in + :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of + providing connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__( + self, + path="/apscheduler", + client=None, + close_connection_on_exit=False, + pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args, + ): + super().__init__() + self.pickle_protocol = pickle_protocol + self.close_connection_on_exit = close_connection_on_exit + + if not path: + raise ValueError('The "path" parameter must not be empty') + + self.path = path + + if client: + self.client = maybe_ref(client) + else: + self.client = KazooClient(**connect_args) + self._ensured_path = False + + def _ensure_paths(self): + if not self._ensured_path: + self.client.ensure_path(self.path) + self._ensured_path = True + + def start(self, scheduler, alias): + super().start(scheduler, alias) + if not self.client.connected: + self.client.start() + + def lookup_job(self, job_id): + self._ensure_paths() + node_path = self.path + "/" + str(job_id) + try: + content, _ = self.client.get(node_path) + doc = pickle.loads(content) + job = self._reconstitute_job(doc["job_state"]) + return job + except BaseException: + return None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + jobs = [ + job_def["job"] + for job_def in self._get_jobs() + if job_def["next_run_time"] is not None + and job_def["next_run_time"] <= timestamp + ] + return jobs + + def get_next_run_time(self): + next_runs = [ + job_def["next_run_time"] + for job_def in self._get_jobs() + if job_def["next_run_time"] is not None + ] + return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None + + def get_all_jobs(self): + jobs = [job_def["job"] for job_def in self._get_jobs()] + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + self._ensure_paths() + node_path = self.path + "/" + str(job.id) + value = { + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": job.__getstate__(), + } + data = pickle.dumps(value, self.pickle_protocol) + try: + self.client.create(node_path, value=data) + except NodeExistsError: + raise ConflictingIdError(job.id) + + def update_job(self, job): + self._ensure_paths() + node_path = self.path + "/" + str(job.id) + changes = { + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": job.__getstate__(), + } + data = pickle.dumps(changes, self.pickle_protocol) + try: + self.client.set(node_path, value=data) + except NoNodeError: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + self._ensure_paths() + node_path = self.path + "/" + str(job_id) + try: + self.client.delete(node_path) + except NoNodeError: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + try: + self.client.delete(self.path, recursive=True) + except NoNodeError: + pass + self._ensured_path = False + + def shutdown(self): + if self.close_connection_on_exit: + self.client.stop() + self.client.close() + + def _reconstitute_job(self, job_state): + job_state = job_state + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self): + self._ensure_paths() + jobs = [] + failed_job_ids = [] + all_ids = self.client.get_children(self.path) + for node_name in all_ids: + try: + node_path = self.path + "/" + node_name + content, _ = self.client.get(node_path) + doc = pickle.loads(content) + job_def = { + "job_id": node_name, + "next_run_time": doc["next_run_time"] + if doc["next_run_time"] + else None, + "job_state": doc["job_state"], + "job": self._reconstitute_job(doc["job_state"]), + "creation_time": _.ctime, + } + jobs.append(job_def) + except BaseException: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', node_name + ) + failed_job_ids.append(node_name) + + # Remove all the jobs we failed to restore + if failed_job_ids: + for failed_id in failed_job_ids: + self.remove_job(failed_id) + paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc) + return sorted( + jobs, + key=lambda job_def: ( + job_def["job"].next_run_time or paused_sort_key, + job_def["creation_time"], + ), + ) + + def __repr__(self): + self._logger.exception("<%s (client=%s)>", self.__class__.__name__, self.client) + return f"<{self.__class__.__name__} (client={self.client})>" |