about summary refs log tree commit diff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py71
1 files changed, 53 insertions, 18 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index f613e61..afea419 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,9 +1,12 @@
+import jsonpickle
+import sqlite3
+
 from rq import Queue
 from rq.job import Job
 from redis import Redis
 from flask import current_app as app
 
-def enqueue_job(delayed_fn, *args, **kwargs):
+def enqueue_job(delayed_fn: str, *args, **kwargs):
     """Add job to queue"""
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
         job = Job.create(
@@ -11,21 +14,18 @@ def enqueue_job(delayed_fn, *args, **kwargs):
                 key: val for key, val in kwargs.items()
                 if key != "additional_jobs_meta"
             },
-            connection = rconn)
-        job.meta["status"] = "enqueued"
-        job.meta["progress"] = 0
-        if "additional_jobs_meta" in kwargs:
-            for key, val in kwargs["additional_jobs_meta"].items():
-                job.meta[key] = val
-
-        job.save_meta()
+            connection = rconn,
+            timeout = "2h",
+            result_ttl = "5h",
+            failure_ttl = "5h"
+        )
 
         queue = Queue("qcapp_queue", connection=rconn)
         queue.enqueue_job(job)
 
     return job
 
-def job(job_id):
+def job(job_id: str):
     "Retrieve the job"
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
         queue = Queue("qcapp_queue", connection=rconn)
@@ -34,14 +34,49 @@ def job(job_id):
 
     return job
 
-def update_meta(rconn, stale_job, **kwargs):
+def create_jobs_table(dbpath: str):
+    """Create sqlite3 table to track job progress"""
+    conn = sqlite3.connect(dbpath)
+    cursor = conn.cursor()
+    cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
+    cursor.close()
+    conn.close()
+
+def retrieve_meta(conn, job_id: str):
+    """Retrieve the job's metadata."""
+    meta = {}
+    try:
+        cursor = conn.cursor()
+        cursor.execute(
+            "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
+        db_row = cursor.fetchone()
+        # meta = json.loads(db_row [1]) if db_row else {}
+        meta = jsonpickle.decode(db_row [1]) if db_row else {}
+    except Exception as exc:
+        cursor.close()
+        raise exc
+    finally:
+        cursor.close()
+
+    return meta
+
+def update_meta(conn, job_id: str, **meta):
     """Update the job's metadata."""
-    job = Job.fetch(stale_job.get_id(), connection=rconn)
-    job.refresh()
-    meta_dict = {**stale_job.meta, **job.meta, **kwargs}
-    for key, val in meta_dict.items():
-        job.meta[key] = val
+    try:
+        cursor = conn.cursor()
+        old_meta = retrieve_meta(conn, job_id)
+        meta = {**old_meta, **meta}
+        query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
+        if not bool(old_meta):
+            query = "INSERT INTO jobs VALUES (:job_id, :meta)"
 
-    job.save_meta()
+        cursor.execute(
+            query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
+        conn.commit()
+    except Exception as exc:
+        cursor.close()
+        raise exc
+    finally:
+        cursor.close()
 
-    return job
+    return meta