From 5632dcab27058875de99d63cbd263acfa3a9a2d5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 28 Apr 2022 15:32:35 +0300 Subject: Use sqlite3 to save the jobs metadata * Use sqlite to save the jobs metadata and enable UI update of the progress for large files --- qc_app/jobs.py | 71 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 18 deletions(-) (limited to 'qc_app/jobs.py') diff --git a/qc_app/jobs.py b/qc_app/jobs.py index f613e61..afea419 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,9 +1,12 @@ +import jsonpickle +import sqlite3 + from rq import Queue from rq.job import Job from redis import Redis from flask import current_app as app -def enqueue_job(delayed_fn, *args, **kwargs): +def enqueue_job(delayed_fn: str, *args, **kwargs): """Add job to queue""" with Redis.from_url(app.config["REDIS_URL"]) as rconn: job = Job.create( @@ -11,21 +14,18 @@ def enqueue_job(delayed_fn, *args, **kwargs): key: val for key, val in kwargs.items() if key != "additional_jobs_meta" }, - connection = rconn) - job.meta["status"] = "enqueued" - job.meta["progress"] = 0 - if "additional_jobs_meta" in kwargs: - for key, val in kwargs["additional_jobs_meta"].items(): - job.meta[key] = val - - job.save_meta() + connection = rconn, + timeout = "2h", + result_ttl = "5h", + failure_ttl = "5h" + ) queue = Queue("qcapp_queue", connection=rconn) queue.enqueue_job(job) return job -def job(job_id): +def job(job_id: str): "Retrieve the job" with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) @@ -34,14 +34,49 @@ def job(job_id): return job -def update_meta(rconn, stale_job, **kwargs): +def create_jobs_table(dbpath: str): + """Create sqlite3 table to track job progress""" + conn = sqlite3.connect(dbpath) + cursor = conn.cursor() + cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)") + cursor.close() + conn.close() + +def retrieve_meta(conn, job_id: str): + """Retrieve the job's metadata.""" + meta = {} + try: + cursor = conn.cursor() + cursor.execute( + "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id}) + db_row = cursor.fetchone() + # meta = json.loads(db_row [1]) if db_row else {} + meta = jsonpickle.decode(db_row [1]) if db_row else {} + except Exception as exc: + cursor.close() + raise exc + finally: + cursor.close() + + return meta + +def update_meta(conn, job_id: str, **meta): """Update the job's metadata.""" - job = Job.fetch(stale_job.get_id(), connection=rconn) - job.refresh() - meta_dict = {**stale_job.meta, **job.meta, **kwargs} - for key, val in meta_dict.items(): - job.meta[key] = val + try: + cursor = conn.cursor() + old_meta = retrieve_meta(conn, job_id) + meta = {**old_meta, **meta} + query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id" + if not bool(old_meta): + query = "INSERT INTO jobs VALUES (:job_id, :meta)" - job.save_meta() + cursor.execute( + query, {"job_id": job_id, "meta": jsonpickle.encode(meta)}) + conn.commit() + except Exception as exc: + cursor.close() + raise exc + finally: + cursor.close() - return job + return meta -- cgit v1.2.3