aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py33
1 files changed, 30 insertions, 3 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dbeb9ce..908c244 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,19 +1,46 @@
from rq import Queue
+from rq.job import Job
from redis import Redis
from flask import current_app as app
def enqueue_job(delayed_fn, *args, **kwargs):
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+ job = Job.create(
+ delayed_fn, args, **{
+ key: val for key, val in kwargs.items()
+ if key != "additional_jobs_meta"
+ },
+ connection = rconn)
+ job.meta["status"] = "enqueued"
+ job.meta["progress"] = 0
+ if "additional_jobs_meta" in kwargs:
+ for key, val in kwargs["additional_jobs_meta"].items():
+ job.meta[key] = val
+
+ job.save_meta()
+
queue = Queue("qcapp_queue", connection=rconn)
- job = queue.enqueue(delayed_fn, *args, **kwargs)
+ queue.enqueue_job(job)
- job.meta["status"] = "enqueued"
- job.save_meta()
return job
def job(job_id):
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
queue = Queue("qcapp_queue", connection=rconn)
job = queue.fetch_job(job_id)
+ job.refresh()
+
+ return job
+
+def update_meta(stale_job, **kwargs):
+ with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+ queue = Queue("qcapp_queue", connection=rconn)
+ job = queue.fetch_job(stale_job.get_id())
+ job.refresh()
+ meta_dict = {**stale_job.meta, **job.meta, **kwargs}
+ for key, val in meta_dict.items():
+ job.meta[key] = val
+
+ job.save_meta()
return job