about summary refs log tree commit diff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py96
1 files changed, 23 insertions, 73 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index afea419..4d3dba6 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,82 +1,32 @@
-import jsonpickle
-import sqlite3
+import os
+import shlex
+import subprocess
+from uuid import uuid4
 
-from rq import Queue
-from rq.job import Job
 from redis import Redis
-from flask import current_app as app
 
-def enqueue_job(delayed_fn: str, *args, **kwargs):
-    """Add job to queue"""
-    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
-        job = Job.create(
-            delayed_fn, args, **{
-                key: val for key, val in kwargs.items()
-                if key != "additional_jobs_meta"
-            },
-            connection = rconn,
-            timeout = "2h",
-            result_ttl = "5h",
-            failure_ttl = "5h"
-        )
+def error_filename(job_id, error_dir):
+    return f"{error_dir}/job_{job_id}.error"
 
-        queue = Queue("qcapp_queue", connection=rconn)
-        queue.enqueue_job(job)
+def launch_job(redis_conn: Redis, filepath, filetype, redisurl, error_dir):
+    """Launch a job in the background"""
+    job_id = str(uuid4())
+    command = [
+        "python3", "-m" "scripts.worker", filetype, filepath, redisurl, job_id]
+    job = {
+        "job_id": job_id, "command": shlex.join(command), "status": "pending",
+        "filename": os.path.basename(filepath), "percent": 0
+    }
+    redis_conn.hset(name=job["job_id"], mapping=job)
 
-    return job
+    if not os.path.exists(error_dir):
+        os.mkdir(error_dir)
 
-def job(job_id: str):
-    "Retrieve the job"
-    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
-        queue = Queue("qcapp_queue", connection=rconn)
-        job = queue.fetch_job(job_id)
-        job.refresh()
+    with open(error_filename(job_id, error_dir), "w") as errorfile:
+        subprocess.Popen(command, stderr=errorfile)
 
     return job
 
-def create_jobs_table(dbpath: str):
-    """Create sqlite3 table to track job progress"""
-    conn = sqlite3.connect(dbpath)
-    cursor = conn.cursor()
-    cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
-    cursor.close()
-    conn.close()
-
-def retrieve_meta(conn, job_id: str):
-    """Retrieve the job's metadata."""
-    meta = {}
-    try:
-        cursor = conn.cursor()
-        cursor.execute(
-            "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
-        db_row = cursor.fetchone()
-        # meta = json.loads(db_row [1]) if db_row else {}
-        meta = jsonpickle.decode(db_row [1]) if db_row else {}
-    except Exception as exc:
-        cursor.close()
-        raise exc
-    finally:
-        cursor.close()
-
-    return meta
-
-def update_meta(conn, job_id: str, **meta):
-    """Update the job's metadata."""
-    try:
-        cursor = conn.cursor()
-        old_meta = retrieve_meta(conn, job_id)
-        meta = {**old_meta, **meta}
-        query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
-        if not bool(old_meta):
-            query = "INSERT INTO jobs VALUES (:job_id, :meta)"
-
-        cursor.execute(
-            query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
-        conn.commit()
-    except Exception as exc:
-        cursor.close()
-        raise exc
-    finally:
-        cursor.close()
-
-    return meta
+def job(redis_conn, job_id: str):
+    "Retrieve the job"
+    return redis_conn.hgetall(job_id)