aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py71
1 files changed, 53 insertions, 18 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index f613e61..afea419 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,9 +1,12 @@
+import jsonpickle
+import sqlite3
+
from rq import Queue
from rq.job import Job
from redis import Redis
from flask import current_app as app
-def enqueue_job(delayed_fn, *args, **kwargs):
+def enqueue_job(delayed_fn: str, *args, **kwargs):
"""Add job to queue"""
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
job = Job.create(
@@ -11,21 +14,18 @@ def enqueue_job(delayed_fn, *args, **kwargs):
key: val for key, val in kwargs.items()
if key != "additional_jobs_meta"
},
- connection = rconn)
- job.meta["status"] = "enqueued"
- job.meta["progress"] = 0
- if "additional_jobs_meta" in kwargs:
- for key, val in kwargs["additional_jobs_meta"].items():
- job.meta[key] = val
-
- job.save_meta()
+ connection = rconn,
+ timeout = "2h",
+ result_ttl = "5h",
+ failure_ttl = "5h"
+ )
queue = Queue("qcapp_queue", connection=rconn)
queue.enqueue_job(job)
return job
-def job(job_id):
+def job(job_id: str):
"Retrieve the job"
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
queue = Queue("qcapp_queue", connection=rconn)
@@ -34,14 +34,49 @@ def job(job_id):
return job
-def update_meta(rconn, stale_job, **kwargs):
+def create_jobs_table(dbpath: str):
+ """Create sqlite3 table to track job progress"""
+ conn = sqlite3.connect(dbpath)
+ cursor = conn.cursor()
+ cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
+ cursor.close()
+ conn.close()
+
+def retrieve_meta(conn, job_id: str):
+ """Retrieve the job's metadata."""
+ meta = {}
+ try:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
+ db_row = cursor.fetchone()
+ # meta = json.loads(db_row [1]) if db_row else {}
+ meta = jsonpickle.decode(db_row [1]) if db_row else {}
+ except Exception as exc:
+ cursor.close()
+ raise exc
+ finally:
+ cursor.close()
+
+ return meta
+
+def update_meta(conn, job_id: str, **meta):
"""Update the job's metadata."""
- job = Job.fetch(stale_job.get_id(), connection=rconn)
- job.refresh()
- meta_dict = {**stale_job.meta, **job.meta, **kwargs}
- for key, val in meta_dict.items():
- job.meta[key] = val
+ try:
+ cursor = conn.cursor()
+ old_meta = retrieve_meta(conn, job_id)
+ meta = {**old_meta, **meta}
+ query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
+ if not bool(old_meta):
+ query = "INSERT INTO jobs VALUES (:job_id, :meta)"
- job.save_meta()
+ cursor.execute(
+ query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
+ conn.commit()
+ except Exception as exc:
+ cursor.close()
+ raise exc
+ finally:
+ cursor.close()
- return job
+ return meta