about summary refs log tree commit diff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-26 15:18:26 +0300
committerFrederick Muriuki Muriithi2022-04-26 15:18:26 +0300
commitea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d (patch)
treef4dc7e29b8e49a1cbf2412d0bcab55294957e021 /qc_app/jobs.py
parente6895f5bac672d2e1d2a04fe8118fa55c3a40b91 (diff)
downloadgn-uploader-ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d.tar.gz
Fix issues caught processing the jobs
* Create and push the application context for the worker functions
* Fix the update of meta fields
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py33
1 files changed, 30 insertions, 3 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dbeb9ce..908c244 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,19 +1,46 @@
 from rq import Queue
+from rq.job import Job
 from redis import Redis
 from flask import current_app as app
 
 def enqueue_job(delayed_fn, *args, **kwargs):
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+        job = Job.create(
+            delayed_fn, args, **{
+                key: val for key, val in kwargs.items()
+                if key != "additional_jobs_meta"
+            },
+            connection = rconn)
+        job.meta["status"] = "enqueued"
+        job.meta["progress"] = 0
+        if "additional_jobs_meta" in kwargs:
+            for key, val in kwargs["additional_jobs_meta"].items():
+                job.meta[key] = val
+
+        job.save_meta()
+
         queue = Queue("qcapp_queue", connection=rconn)
-        job =  queue.enqueue(delayed_fn, *args, **kwargs)
+        queue.enqueue_job(job)
 
-    job.meta["status"] = "enqueued"
-    job.save_meta()
     return job
 
 def job(job_id):
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
         queue = Queue("qcapp_queue", connection=rconn)
         job = queue.fetch_job(job_id)
+        job.refresh()
+
+    return job
+
+def update_meta(stale_job, **kwargs):
+    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+        queue = Queue("qcapp_queue", connection=rconn)
+        job = queue.fetch_job(stale_job.get_id())
+        job.refresh()
+        meta_dict = {**stale_job.meta, **job.meta, **kwargs}
+        for key, val in meta_dict.items():
+            job.meta[key] = val
+
+        job.save_meta()
 
     return job