about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py')
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py
new file mode 100644
index 00000000..528285fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py
@@ -0,0 +1,160 @@
+import pickle
+from datetime import datetime, timezone
+
+from apscheduler.job import Job
+from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
+from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime
+
+try:
+    from redis import Redis
+except ImportError as exc:  # pragma: nocover
+    raise ImportError("RedisJobStore requires redis installed") from exc
+
+
+class RedisJobStore(BaseJobStore):
+    """
+    Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's
+    :class:`~redis.StrictRedis`.
+
+    Plugin alias: ``redis``
+
+    :param int db: the database number to store jobs in
+    :param str jobs_key: key to store jobs in
+    :param str run_times_key: key to store the jobs' run times in
+    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+        highest available
+    """
+
+    def __init__(
+        self,
+        db=0,
+        jobs_key="apscheduler.jobs",
+        run_times_key="apscheduler.run_times",
+        pickle_protocol=pickle.HIGHEST_PROTOCOL,
+        **connect_args,
+    ):
+        super().__init__()
+
+        if db is None:
+            raise ValueError('The "db" parameter must not be empty')
+        if not jobs_key:
+            raise ValueError('The "jobs_key" parameter must not be empty')
+        if not run_times_key:
+            raise ValueError('The "run_times_key" parameter must not be empty')
+
+        self.pickle_protocol = pickle_protocol
+        self.jobs_key = jobs_key
+        self.run_times_key = run_times_key
+        self.redis = Redis(db=int(db), **connect_args)
+
+    def lookup_job(self, job_id):
+        job_state = self.redis.hget(self.jobs_key, job_id)
+        return self._reconstitute_job(job_state) if job_state else None
+
+    def get_due_jobs(self, now):
+        timestamp = datetime_to_utc_timestamp(now)
+        job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)
+        if job_ids:
+            job_states = self.redis.hmget(self.jobs_key, *job_ids)
+            return self._reconstitute_jobs(zip(job_ids, job_states))
+        return []
+
+    def get_next_run_time(self):
+        next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True)
+        if next_run_time:
+            return utc_timestamp_to_datetime(next_run_time[0][1])
+
+    def get_all_jobs(self):
+        job_states = self.redis.hgetall(self.jobs_key)
+        jobs = self._reconstitute_jobs(job_states.items())
+        paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc)
+        return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key)
+
+    def add_job(self, job):
+        if self.redis.hexists(self.jobs_key, job.id):
+            raise ConflictingIdError(job.id)
+
+        with self.redis.pipeline() as pipe:
+            pipe.multi()
+            pipe.hset(
+                self.jobs_key,
+                job.id,
+                pickle.dumps(job.__getstate__(), self.pickle_protocol),
+            )
+            if job.next_run_time:
+                pipe.zadd(
+                    self.run_times_key,
+                    {job.id: datetime_to_utc_timestamp(job.next_run_time)},
+                )
+
+            pipe.execute()
+
+    def update_job(self, job):
+        if not self.redis.hexists(self.jobs_key, job.id):
+            raise JobLookupError(job.id)
+
+        with self.redis.pipeline() as pipe:
+            pipe.hset(
+                self.jobs_key,
+                job.id,
+                pickle.dumps(job.__getstate__(), self.pickle_protocol),
+            )
+            if job.next_run_time:
+                pipe.zadd(
+                    self.run_times_key,
+                    {job.id: datetime_to_utc_timestamp(job.next_run_time)},
+                )
+            else:
+                pipe.zrem(self.run_times_key, job.id)
+
+            pipe.execute()
+
+    def remove_job(self, job_id):
+        if not self.redis.hexists(self.jobs_key, job_id):
+            raise JobLookupError(job_id)
+
+        with self.redis.pipeline() as pipe:
+            pipe.hdel(self.jobs_key, job_id)
+            pipe.zrem(self.run_times_key, job_id)
+            pipe.execute()
+
+    def remove_all_jobs(self):
+        with self.redis.pipeline() as pipe:
+            pipe.delete(self.jobs_key)
+            pipe.delete(self.run_times_key)
+            pipe.execute()
+
+    def shutdown(self):
+        self.redis.connection_pool.disconnect()
+
+    def _reconstitute_job(self, job_state):
+        job_state = pickle.loads(job_state)
+        job = Job.__new__(Job)
+        job.__setstate__(job_state)
+        job._scheduler = self._scheduler
+        job._jobstore_alias = self._alias
+        return job
+
+    def _reconstitute_jobs(self, job_states):
+        jobs = []
+        failed_job_ids = []
+        for job_id, job_state in job_states:
+            try:
+                jobs.append(self._reconstitute_job(job_state))
+            except BaseException:
+                self._logger.exception(
+                    'Unable to restore job "%s" -- removing it', job_id
+                )
+                failed_job_ids.append(job_id)
+
+        # Remove all the jobs we failed to restore
+        if failed_job_ids:
+            with self.redis.pipeline() as pipe:
+                pipe.hdel(self.jobs_key, *failed_job_ids)
+                pipe.zrem(self.run_times_key, *failed_job_ids)
+                pipe.execute()
+
+        return jobs
+
+    def __repr__(self):
+        return f"<{self.__class__.__name__}>"