aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py')
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
new file mode 100644
index 00000000..d78290b1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
@@ -0,0 +1,173 @@
+import pickle
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+ datetime_to_utc_timestamp,
+ maybe_ref,
+ utc_timestamp_to_datetime,
+)
+
+try:
+ from rethinkdb import RethinkDB
+except ImportError as exc: # pragma: nocover
+ raise ImportError("RethinkDBJobStore requires rethinkdb installed") from exc
+
+
+class RethinkDBJobStore(BaseJobStore):
+ """
+ Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to
+ rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.
+
+ Plugin alias: ``rethinkdb``
+
+ :param str database: database to store jobs in
+ :param str collection: collection to store jobs in
+ :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing
+ connection arguments
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(
+ self,
+ database="apscheduler",
+ table="jobs",
+ client=None,
+ pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ **connect_args,
+ ):
+ super().__init__()
+
+ if not database:
+ raise ValueError('The "database" parameter must not be empty')
+ if not table:
+ raise ValueError('The "table" parameter must not be empty')
+
+ self.database = database
+ self.table_name = table
+ self.table = None
+ self.client = client
+ self.pickle_protocol = pickle_protocol
+ self.connect_args = connect_args
+ self.r = RethinkDB()
+ self.conn = None
+
+ def start(self, scheduler, alias):
+ super().start(scheduler, alias)
+
+ if self.client:
+ self.conn = maybe_ref(self.client)
+ else:
+ self.conn = self.r.connect(db=self.database, **self.connect_args)
+
+ if self.database not in self.r.db_list().run(self.conn):
+ self.r.db_create(self.database).run(self.conn)
+
+ if self.table_name not in self.r.table_list().run(self.conn):
+ self.r.table_create(self.table_name).run(self.conn)
+
+ if "next_run_time" not in self.r.table(self.table_name).index_list().run(
+ self.conn
+ ):
+ self.r.table(self.table_name).index_create("next_run_time").run(self.conn)
+
+ self.table = self.r.db(self.database).table(self.table_name)
+
+ def lookup_job(self, job_id):
+ results = list(self.table.get_all(job_id).pluck("job_state").run(self.conn))
+ return self._reconstitute_job(results[0]["job_state"]) if results else None
+
+ def get_due_jobs(self, now):
+ return self._get_jobs(
+ self.r.row["next_run_time"] <= datetime_to_utc_timestamp(now)
+ )
+
+ def get_next_run_time(self):
+ results = list(
+ self.table.filter(self.r.row["next_run_time"] != None)
+ .order_by(self.r.asc("next_run_time"))
+ .map(lambda x: x["next_run_time"])
+ .limit(1)
+ .run(self.conn)
+ )
+ return utc_timestamp_to_datetime(results[0]) if results else None
+
+ def get_all_jobs(self):
+ jobs = self._get_jobs()
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ job_dict = {
+ "id": job.id,
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": self.r.binary(
+ pickle.dumps(job.__getstate__(), self.pickle_protocol)
+ ),
+ }
+ results = self.table.insert(job_dict).run(self.conn)
+ if results["errors"] > 0:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ changes = {
+ "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+ "job_state": self.r.binary(
+ pickle.dumps(job.__getstate__(), self.pickle_protocol)
+ ),
+ }
+ results = self.table.get_all(job.id).update(changes).run(self.conn)
+ skipped = False in map(lambda x: results[x] == 0, results.keys())
+ if results["skipped"] > 0 or results["errors"] > 0 or not skipped:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ results = self.table.get_all(job_id).delete().run(self.conn)
+ if results["deleted"] + results["skipped"] != 1:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ self.table.delete().run(self.conn)
+
+ def shutdown(self):
+ self.conn.close()
+
+ def _reconstitute_job(self, job_state):
+ job_state = pickle.loads(job_state)
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self, predicate=None):
+ jobs = []
+ failed_job_ids = []
+ query = (
+ self.table.filter(self.r.row["next_run_time"] != None).filter(predicate)
+ if predicate
+ else self.table
+ )
+ query = query.order_by("next_run_time", "id").pluck("id", "job_state")
+
+ for document in query.run(self.conn):
+ try:
+ jobs.append(self._reconstitute_job(document["job_state"]))
+ except Exception:
+ self._logger.exception(
+ 'Unable to restore job "%s" -- removing it', document["id"]
+ )
+ failed_job_ids.append(document["id"])
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ self.r.expr(failed_job_ids).for_each(
+ lambda job_id: self.table.get_all(job_id).delete()
+ ).run(self.conn)
+
+ return jobs
+
+ def __repr__(self):
+ connection = self.conn
+ return f"<{self.__class__.__name__} (connection={connection})>"