about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py')
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
new file mode 100644
index 00000000..d78290b1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/rethinkdb.py
@@ -0,0 +1,173 @@
+import pickle
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import (
+    datetime_to_utc_timestamp,
+    maybe_ref,
+    utc_timestamp_to_datetime,
+)
+
+try:
+    from rethinkdb import RethinkDB
+except ImportError as exc:  # pragma: nocover
+    raise ImportError("RethinkDBJobStore requires rethinkdb installed") from exc
+
+
+class RethinkDBJobStore(BaseJobStore):
+    """
+    Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to
+    rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.
+
+    Plugin alias: ``rethinkdb``
+
+    :param str database: database to store jobs in
+    :param str collection: collection to store jobs in
+    :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing
+        connection arguments
+    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+        highest available
+    """
+
+    def __init__(
+        self,
+        database="apscheduler",
+        table="jobs",
+        client=None,
+        pickle_protocol=pickle.HIGHEST_PROTOCOL,
+        **connect_args,
+    ):
+        super().__init__()
+
+        if not database:
+            raise ValueError('The "database" parameter must not be empty')
+        if not table:
+            raise ValueError('The "table" parameter must not be empty')
+
+        self.database = database
+        self.table_name = table
+        self.table = None
+        self.client = client
+        self.pickle_protocol = pickle_protocol
+        self.connect_args = connect_args
+        self.r = RethinkDB()
+        self.conn = None
+
+    def start(self, scheduler, alias):
+        super().start(scheduler, alias)
+
+        if self.client:
+            self.conn = maybe_ref(self.client)
+        else:
+            self.conn = self.r.connect(db=self.database, **self.connect_args)
+
+        if self.database not in self.r.db_list().run(self.conn):
+            self.r.db_create(self.database).run(self.conn)
+
+        if self.table_name not in self.r.table_list().run(self.conn):
+            self.r.table_create(self.table_name).run(self.conn)
+
+        if "next_run_time" not in self.r.table(self.table_name).index_list().run(
+            self.conn
+        ):
+            self.r.table(self.table_name).index_create("next_run_time").run(self.conn)
+
+        self.table = self.r.db(self.database).table(self.table_name)
+
+    def lookup_job(self, job_id):
+        results = list(self.table.get_all(job_id).pluck("job_state").run(self.conn))
+        return self._reconstitute_job(results[0]["job_state"]) if results else None
+
+    def get_due_jobs(self, now):
+        return self._get_jobs(
+            self.r.row["next_run_time"] <= datetime_to_utc_timestamp(now)
+        )
+
+    def get_next_run_time(self):
+        results = list(
+            self.table.filter(self.r.row["next_run_time"] != None)
+            .order_by(self.r.asc("next_run_time"))
+            .map(lambda x: x["next_run_time"])
+            .limit(1)
+            .run(self.conn)
+        )
+        return utc_timestamp_to_datetime(results[0]) if results else None
+
+    def get_all_jobs(self):
+        jobs = self._get_jobs()
+        self._fix_paused_jobs_sorting(jobs)
+        return jobs
+
+    def add_job(self, job):
+        job_dict = {
+            "id": job.id,
+            "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+            "job_state": self.r.binary(
+                pickle.dumps(job.__getstate__(), self.pickle_protocol)
+            ),
+        }
+        results = self.table.insert(job_dict).run(self.conn)
+        if results["errors"] > 0:
+            raise ConflictingIdError(job.id)
+
+    def update_job(self, job):
+        changes = {
+            "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
+            "job_state": self.r.binary(
+                pickle.dumps(job.__getstate__(), self.pickle_protocol)
+            ),
+        }
+        results = self.table.get_all(job.id).update(changes).run(self.conn)
+        skipped = False in map(lambda x: results[x] == 0, results.keys())
+        if results["skipped"] > 0 or results["errors"] > 0 or not skipped:
+            raise JobLookupError(job.id)
+
+    def remove_job(self, job_id):
+        results = self.table.get_all(job_id).delete().run(self.conn)
+        if results["deleted"] + results["skipped"] != 1:
+            raise JobLookupError(job_id)
+
+    def remove_all_jobs(self):
+        self.table.delete().run(self.conn)
+
+    def shutdown(self):
+        self.conn.close()
+
+    def _reconstitute_job(self, job_state):
+        job_state = pickle.loads(job_state)
+        job = Job.__new__(Job)
+        job.__setstate__(job_state)
+        job._scheduler = self._scheduler
+        job._jobstore_alias = self._alias
+        return job
+
+    def _get_jobs(self, predicate=None):
+        jobs = []
+        failed_job_ids = []
+        query = (
+            self.table.filter(self.r.row["next_run_time"] != None).filter(predicate)
+            if predicate
+            else self.table
+        )
+        query = query.order_by("next_run_time", "id").pluck("id", "job_state")
+
+        for document in query.run(self.conn):
+            try:
+                jobs.append(self._reconstitute_job(document["job_state"]))
+            except Exception:
+                self._logger.exception(
+                    'Unable to restore job "%s" -- removing it', document["id"]
+                )
+                failed_job_ids.append(document["id"])
+
+        # Remove all the jobs we failed to restore
+        if failed_job_ids:
+            self.r.expr(failed_job_ids).for_each(
+                lambda job_id: self.table.get_all(job_id).delete()
+            ).run(self.conn)
+
+        return jobs
+
+    def __repr__(self):
+        connection = self.conn
+        return f"<{self.__class__.__name__} (connection={connection})>"