diff options
author | Frederick Muriuki Muriithi | 2022-04-26 15:18:26 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-04-26 15:18:26 +0300 |
commit | ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d (patch) | |
tree | f4dc7e29b8e49a1cbf2412d0bcab55294957e021 /qc_app/jobs.py | |
parent | e6895f5bac672d2e1d2a04fe8118fa55c3a40b91 (diff) | |
download | gn-uploader-ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d.tar.gz |
Fix issues caught processing the jobs
* Create and push the application context for the worker functions
* Fix the update of meta fields
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r-- | qc_app/jobs.py | 33 |
1 files changed, 30 insertions, 3 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py index dbeb9ce..908c244 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,19 +1,46 @@ from rq import Queue +from rq.job import Job from redis import Redis from flask import current_app as app def enqueue_job(delayed_fn, *args, **kwargs): with Redis.from_url(app.config["REDIS_URL"]) as rconn: + job = Job.create( + delayed_fn, args, **{ + key: val for key, val in kwargs.items() + if key != "additional_jobs_meta" + }, + connection = rconn) + job.meta["status"] = "enqueued" + job.meta["progress"] = 0 + if "additional_jobs_meta" in kwargs: + for key, val in kwargs["additional_jobs_meta"].items(): + job.meta[key] = val + + job.save_meta() + queue = Queue("qcapp_queue", connection=rconn) - job = queue.enqueue(delayed_fn, *args, **kwargs) + queue.enqueue_job(job) - job.meta["status"] = "enqueued" - job.save_meta() return job def job(job_id): with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) job = queue.fetch_job(job_id) + job.refresh() + + return job + +def update_meta(stale_job, **kwargs): + with Redis.from_url(app.config["REDIS_URL"]) as rconn: + queue = Queue("qcapp_queue", connection=rconn) + job = queue.fetch_job(stale_job.get_id()) + job.refresh() + meta_dict = {**stale_job.meta, **job.meta, **kwargs} + for key, val in meta_dict.items(): + job.meta[key] = val + + job.save_meta() return job |