From ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 26 Apr 2022 15:18:26 +0300 Subject: Fix issues caught processing the jobs * Create and push the application context for the worker functions * Fix the update of meta fields --- qc_app/jobs.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) (limited to 'qc_app/jobs.py') diff --git a/qc_app/jobs.py b/qc_app/jobs.py index dbeb9ce..908c244 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,19 +1,46 @@ from rq import Queue +from rq.job import Job from redis import Redis from flask import current_app as app def enqueue_job(delayed_fn, *args, **kwargs): with Redis.from_url(app.config["REDIS_URL"]) as rconn: + job = Job.create( + delayed_fn, args, **{ + key: val for key, val in kwargs.items() + if key != "additional_jobs_meta" + }, + connection = rconn) + job.meta["status"] = "enqueued" + job.meta["progress"] = 0 + if "additional_jobs_meta" in kwargs: + for key, val in kwargs["additional_jobs_meta"].items(): + job.meta[key] = val + + job.save_meta() + queue = Queue("qcapp_queue", connection=rconn) - job = queue.enqueue(delayed_fn, *args, **kwargs) + queue.enqueue_job(job) - job.meta["status"] = "enqueued" - job.save_meta() return job def job(job_id): with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) job = queue.fetch_job(job_id) + job.refresh() + + return job + +def update_meta(stale_job, **kwargs): + with Redis.from_url(app.config["REDIS_URL"]) as rconn: + queue = Queue("qcapp_queue", connection=rconn) + job = queue.fetch_job(stale_job.get_id()) + job.refresh() + meta_dict = {**stale_job.meta, **job.meta, **kwargs} + for key, val in meta_dict.items(): + job.meta[key] = val + + job.save_meta() return job -- cgit v1.2.3