diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py new file mode 100644 index 00000000..528285fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/jobstores/redis.py @@ -0,0 +1,160 @@ +import pickle +from datetime import datetime, timezone + +from apscheduler.job import Job +from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError +from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime + +try: + from redis import Redis +except ImportError as exc: # pragma: nocover + raise ImportError("RedisJobStore requires redis installed") from exc + + +class RedisJobStore(BaseJobStore): + """ + Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's + :class:`~redis.StrictRedis`. + + Plugin alias: ``redis`` + + :param int db: the database number to store jobs in + :param str jobs_key: key to store jobs in + :param str run_times_key: key to store the jobs' run times in + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__( + self, + db=0, + jobs_key="apscheduler.jobs", + run_times_key="apscheduler.run_times", + pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args, + ): + super().__init__() + + if db is None: + raise ValueError('The "db" parameter must not be empty') + if not jobs_key: + raise ValueError('The "jobs_key" parameter must not be empty') + if not run_times_key: + raise ValueError('The "run_times_key" parameter must not be empty') + + self.pickle_protocol = pickle_protocol + self.jobs_key = jobs_key + self.run_times_key = run_times_key + self.redis = Redis(db=int(db), **connect_args) + + def lookup_job(self, job_id): + job_state = self.redis.hget(self.jobs_key, job_id) + return self._reconstitute_job(job_state) if job_state else None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp) + if job_ids: + job_states = self.redis.hmget(self.jobs_key, *job_ids) + return self._reconstitute_jobs(zip(job_ids, job_states)) + return [] + + def get_next_run_time(self): + next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True) + if next_run_time: + return utc_timestamp_to_datetime(next_run_time[0][1]) + + def get_all_jobs(self): + job_states = self.redis.hgetall(self.jobs_key) + jobs = self._reconstitute_jobs(job_states.items()) + paused_sort_key = datetime(9999, 12, 31, tzinfo=timezone.utc) + return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key) + + def add_job(self, job): + if self.redis.hexists(self.jobs_key, job.id): + raise ConflictingIdError(job.id) + + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.hset( + self.jobs_key, + job.id, + pickle.dumps(job.__getstate__(), self.pickle_protocol), + ) + if job.next_run_time: + pipe.zadd( + self.run_times_key, + {job.id: datetime_to_utc_timestamp(job.next_run_time)}, + ) + + pipe.execute() + + def update_job(self, job): + if not self.redis.hexists(self.jobs_key, job.id): + raise JobLookupError(job.id) + + with self.redis.pipeline() as pipe: + pipe.hset( + self.jobs_key, + job.id, + pickle.dumps(job.__getstate__(), self.pickle_protocol), + ) + if job.next_run_time: + pipe.zadd( + self.run_times_key, + {job.id: datetime_to_utc_timestamp(job.next_run_time)}, + ) + else: + pipe.zrem(self.run_times_key, job.id) + + pipe.execute() + + def remove_job(self, job_id): + if not self.redis.hexists(self.jobs_key, job_id): + raise JobLookupError(job_id) + + with self.redis.pipeline() as pipe: + pipe.hdel(self.jobs_key, job_id) + pipe.zrem(self.run_times_key, job_id) + pipe.execute() + + def remove_all_jobs(self): + with self.redis.pipeline() as pipe: + pipe.delete(self.jobs_key) + pipe.delete(self.run_times_key) + pipe.execute() + + def shutdown(self): + self.redis.connection_pool.disconnect() + + def _reconstitute_job(self, job_state): + job_state = pickle.loads(job_state) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _reconstitute_jobs(self, job_states): + jobs = [] + failed_job_ids = [] + for job_id, job_state in job_states: + try: + jobs.append(self._reconstitute_job(job_state)) + except BaseException: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', job_id + ) + failed_job_ids.append(job_id) + + # Remove all the jobs we failed to restore + if failed_job_ids: + with self.redis.pipeline() as pipe: + pipe.hdel(self.jobs_key, *failed_job_ids) + pipe.zrem(self.run_times_key, *failed_job_ids) + pipe.execute() + + return jobs + + def __repr__(self): + return f"<{self.__class__.__name__}>" |