aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-19 15:26:15 +0300
committerFrederick Muriuki Muriithi2022-05-19 15:30:20 +0300
commit2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c (patch)
tree9e0fdcc010925a9f6d5674d41991b6185aea54e3 /qc_app/jobs.py
parent27f6e9e28f2a3244bdd00336cf918de97b2ceed6 (diff)
downloadgn-uploader-2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c.tar.gz
Update Web-UI: use new error collection paradigm
- README.org: document how to run scripts manually - manifest.scm: remove python-rq as a dependency - qc_app/jobs.py: rework job launching and processing - qc_app/parse.py: use reworked job processing - qc_app/templates/job_progress.html: display progress correctly - qc_app/templates/parse_results.html: display final results - scripts/worker.py: new worker script
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py96
1 files changed, 23 insertions, 73 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index afea419..4d3dba6 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,82 +1,32 @@
-import jsonpickle
-import sqlite3
+import os
+import shlex
+import subprocess
+from uuid import uuid4
-from rq import Queue
-from rq.job import Job
from redis import Redis
-from flask import current_app as app
-def enqueue_job(delayed_fn: str, *args, **kwargs):
- """Add job to queue"""
- with Redis.from_url(app.config["REDIS_URL"]) as rconn:
- job = Job.create(
- delayed_fn, args, **{
- key: val for key, val in kwargs.items()
- if key != "additional_jobs_meta"
- },
- connection = rconn,
- timeout = "2h",
- result_ttl = "5h",
- failure_ttl = "5h"
- )
+def error_filename(job_id, error_dir):
+ return f"{error_dir}/job_{job_id}.error"
- queue = Queue("qcapp_queue", connection=rconn)
- queue.enqueue_job(job)
+def launch_job(redis_conn: Redis, filepath, filetype, redisurl, error_dir):
+ """Launch a job in the background"""
+ job_id = str(uuid4())
+ command = [
+ "python3", "-m" "scripts.worker", filetype, filepath, redisurl, job_id]
+ job = {
+ "job_id": job_id, "command": shlex.join(command), "status": "pending",
+ "filename": os.path.basename(filepath), "percent": 0
+ }
+ redis_conn.hset(name=job["job_id"], mapping=job)
- return job
+ if not os.path.exists(error_dir):
+ os.mkdir(error_dir)
-def job(job_id: str):
- "Retrieve the job"
- with Redis.from_url(app.config["REDIS_URL"]) as rconn:
- queue = Queue("qcapp_queue", connection=rconn)
- job = queue.fetch_job(job_id)
- job.refresh()
+ with open(error_filename(job_id, error_dir), "w") as errorfile:
+ subprocess.Popen(command, stderr=errorfile)
return job
-def create_jobs_table(dbpath: str):
- """Create sqlite3 table to track job progress"""
- conn = sqlite3.connect(dbpath)
- cursor = conn.cursor()
- cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
- cursor.close()
- conn.close()
-
-def retrieve_meta(conn, job_id: str):
- """Retrieve the job's metadata."""
- meta = {}
- try:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
- db_row = cursor.fetchone()
- # meta = json.loads(db_row [1]) if db_row else {}
- meta = jsonpickle.decode(db_row [1]) if db_row else {}
- except Exception as exc:
- cursor.close()
- raise exc
- finally:
- cursor.close()
-
- return meta
-
-def update_meta(conn, job_id: str, **meta):
- """Update the job's metadata."""
- try:
- cursor = conn.cursor()
- old_meta = retrieve_meta(conn, job_id)
- meta = {**old_meta, **meta}
- query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
- if not bool(old_meta):
- query = "INSERT INTO jobs VALUES (:job_id, :meta)"
-
- cursor.execute(
- query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
- conn.commit()
- except Exception as exc:
- cursor.close()
- raise exc
- finally:
- cursor.close()
-
- return meta
+def job(redis_conn, job_id: str):
+ "Retrieve the job"
+ return redis_conn.hgetall(job_id)