aboutsummaryrefslogtreecommitdiff
path: root/qc_app
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-28 15:32:35 +0300
committerFrederick Muriuki Muriithi2022-04-28 15:32:35 +0300
commit5632dcab27058875de99d63cbd263acfa3a9a2d5 (patch)
tree93cce3204086c14760c3e76497b8b106100b4a96 /qc_app
parentdefc1cf0c1635f3262200a9ba25d8bd0c6fc0a93 (diff)
downloadgn-uploader-5632dcab27058875de99d63cbd263acfa3a9a2d5.tar.gz
Use sqlite3 to save the jobs metadata
* Use sqlite to save the jobs metadata and enable UI update of the progress for large files
Diffstat (limited to 'qc_app')
-rw-r--r--qc_app/jobs.py71
-rw-r--r--qc_app/parse.py217
2 files changed, 192 insertions, 96 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index f613e61..afea419 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,9 +1,12 @@
+import jsonpickle
+import sqlite3
+
from rq import Queue
from rq.job import Job
from redis import Redis
from flask import current_app as app
-def enqueue_job(delayed_fn, *args, **kwargs):
+def enqueue_job(delayed_fn: str, *args, **kwargs):
"""Add job to queue"""
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
job = Job.create(
@@ -11,21 +14,18 @@ def enqueue_job(delayed_fn, *args, **kwargs):
key: val for key, val in kwargs.items()
if key != "additional_jobs_meta"
},
- connection = rconn)
- job.meta["status"] = "enqueued"
- job.meta["progress"] = 0
- if "additional_jobs_meta" in kwargs:
- for key, val in kwargs["additional_jobs_meta"].items():
- job.meta[key] = val
-
- job.save_meta()
+ connection = rconn,
+ timeout = "2h",
+ result_ttl = "5h",
+ failure_ttl = "5h"
+ )
queue = Queue("qcapp_queue", connection=rconn)
queue.enqueue_job(job)
return job
-def job(job_id):
+def job(job_id: str):
"Retrieve the job"
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
queue = Queue("qcapp_queue", connection=rconn)
@@ -34,14 +34,49 @@ def job(job_id):
return job
-def update_meta(rconn, stale_job, **kwargs):
+def create_jobs_table(dbpath: str):
+ """Create sqlite3 table to track job progress"""
+ conn = sqlite3.connect(dbpath)
+ cursor = conn.cursor()
+ cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
+ cursor.close()
+ conn.close()
+
+def retrieve_meta(conn, job_id: str):
+ """Retrieve the job's metadata."""
+ meta = {}
+ try:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
+ db_row = cursor.fetchone()
+ # meta = json.loads(db_row [1]) if db_row else {}
+ meta = jsonpickle.decode(db_row [1]) if db_row else {}
+ except Exception as exc:
+ cursor.close()
+ raise exc
+ finally:
+ cursor.close()
+
+ return meta
+
+def update_meta(conn, job_id: str, **meta):
"""Update the job's metadata."""
- job = Job.fetch(stale_job.get_id(), connection=rconn)
- job.refresh()
- meta_dict = {**stale_job.meta, **job.meta, **kwargs}
- for key, val in meta_dict.items():
- job.meta[key] = val
+ try:
+ cursor = conn.cursor()
+ old_meta = retrieve_meta(conn, job_id)
+ meta = {**old_meta, **meta}
+ query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
+ if not bool(old_meta):
+ query = "INSERT INTO jobs VALUES (:job_id, :meta)"
- job.save_meta()
+ cursor.execute(
+ query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
+ conn.commit()
+ except Exception as exc:
+ cursor.close()
+ raise exc
+ finally:
+ cursor.close()
- return job
+ return meta
diff --git a/qc_app/parse.py b/qc_app/parse.py
index 3398918..795cc01 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -1,5 +1,6 @@
"""File parsing module"""
import os
+import sqlite3
from functools import reduce
from redis import Redis
@@ -23,32 +24,41 @@ from quality_control.parsing import (
parsebp = Blueprint("parse", __name__)
-def queued_parse(filepath, filetype, strainsfile, redis_url):
+def queued_parse(
+ filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
+ dbpath: str):
job = get_current_job()
+ job_id = job.get_id()
with Redis.from_url(redis_url) as rconn:
- jobs.update_meta(rconn, job, status = "in-progress", progress = 0)
- job.save_meta()
+ dbconn = sqlite3.connect(dbpath)
try:
+ job_meta = jobs.update_meta(
+ dbconn, job_id, status = "in-progress", progress = 0)
parsed = parse_file(
filepath, filetype, strain_names(parse_strains(strainsfile)))
for line, curr_size in parsed:
- jobs.update_meta(
- rconn, job, progress = (curr_size/job.meta["filesize"]) * 100,
+ job_meta = jobs.update_meta(
+ dbconn, job_id,
+ progress=((curr_size/job_meta["filesize"]) * 100),
message = f"Parsed {curr_size} bytes")
- print(f"Progress: {curr_size} bytes: {(curr_size/job.meta['filesize']) * 100}%")
os.remove(filepath)
- jobs.update_meta(
- rconn, job, progress = 10, status = "success",
+ jobs_meta = jobs.update_meta(
+ dbconn, job_id, progress = 100, status = "success",
message = "no errors found")
except ParseError as pe:
pe_dict = pe.args[0]
- jobs.update_meta(
- rconn, job, status = "parse-error", results = {
+ job_meta = jobs.update_meta(
+ dbconn, job_id, status = "parse-error", results = {
"filename": os.path.basename(filepath), "filetype": filetype,
"position": pe_dict["position"],
"line_number": pe_dict["line_number"]
})
+ finally:
+ dbconn.close()
+
+def retrieve_dbpath():
+ return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db")
@parsebp.route("/parse", methods=["GET"])
def parse():
@@ -73,94 +83,145 @@ def parse():
if errors:
return redirect(url_for("entry.index"))
+ jobs.create_jobs_table(retrieve_dbpath())
filetype = (
FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
job = jobs.enqueue_job(
"qc_app.parse.queued_parse", filepath, filetype,
os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"],
- additional_jobs_meta = {
- "filename": filename, "filesize": os.stat(filepath).st_size})
+ retrieve_dbpath())
+ try:
+ dbconn = sqlite3.connect(retrieve_dbpath())
+ jobs.update_meta(
+ dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size,
+ status="enqueued", progress=0)
+ except Exception as exc:
+ import traceback
+ print(traceback.format_exc())
+ dbconn.rollback()
+ finally:
+ dbconn.close()
+
return redirect(url_for("parse.parse_status", job_id=job.get_id()))
@parsebp.route("/status/<job_id>", methods=["GET"])
-def parse_status(job_id):
- job = jobs.job(job_id)
- if job:
- job_id = job.get_id()
- progress = job.meta["progress"]
- status = job.meta["status"]
- filename = job.meta.get("filename", "uploaded file")
- if status == "success":
- return redirect(url_for("parse.results", job_id=job_id))
-
- if status == "parse-error":
- return redirect(url_for("parse.fail", job_id=job_id))
-
- return render_template(
- "job_progress.html",
- job_id = job_id,
- job_status = status,
- progress = progress,
- message = job.meta.get("message", ""),
- job_name = f"Parsing '{filename}'")
-
- return render_template("no_such_job.html", job_id=job_id)
+def parse_status(job_id: str):
+ try:
+ dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10)
+ job = jobs.job(job_id)
+ if job:
+ job_id = job.get_id()
+ job_meta = jobs.retrieve_meta(dbconn, job_id)
+ progress = job_meta["progress"]
+ status = job_meta["status"]
+ filename = job_meta.get("filename", "uploaded file")
+ if status == "success":
+ return redirect(url_for("parse.results", job_id=job_id))
+
+ if status == "parse-error":
+ return redirect(url_for("parse.fail", job_id=job_id))
+
+ return render_template(
+ "job_progress.html",
+ job_id = job_id,
+ job_status = status,
+ progress = progress,
+ message = job_meta.get("message", ""),
+ job_name = f"Parsing '{filename}'")
+
+ return render_template("no_such_job.html", job_id=job_id)
+ except sqlite3.OperationalError:
+ return redirect(url_for("parse.parse_status", job_id=job_id))
+ except Exception as exc:
+ import traceback
+ print(traceback.format_exc())
+ return exc
+ finally:
+ dbconn.close()
@parsebp.route("/results/<job_id>", methods=["GET"])
-def results(job_id):
+def results(job_id: str):
"""Show results of parsing..."""
job = jobs.job(job_id)
if job:
- filename = job.meta["filename"]
- errors = job.meta.get("errors", [])
- return render_template(
- "parse_results.html",
- errors=errors,
- job_name = f"Parsing '{filename}'",
- starting_line_number=job.meta.get("starting_line_number", 0))
+ try:
+ dbconn = sqlite3.connect(retrieve_dbpath())
+ job_meta = jobs.retrieve_meta(dbconn, job_id)
+ filename = job_meta["filename"]
+ errors = job_meta.get("errors", [])
+ return render_template(
+ "parse_results.html",
+ errors=errors,
+ job_name = f"Parsing '{filename}'",
+ starting_line_number=job_meta.get("starting_line_number", 0))
+ except Exception as exc:
+ import traceback
+ print(traceback.format_exc())
+ finally:
+ dbconn.close()
return render_template("no_such_job.html", job_id=job_id)
-def queued_collect_errors(filepath, filetype, strainsfile, redis_url, seek_pos=0):
+def queued_collect_errors(
+ filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
+ dbpath: str, seek_pos: int = 0):
job = get_current_job()
+ job_id = job.get_id()
errors = []
count = 0
with Redis.from_url(redis_url) as rconn:
- for error in parse_errors(
- filepath, filetype, strain_names(parse_strains(strainsfile)),
- seek_pos):
- count = count + 1
- jobs.update_meta(
- rconn, job, message = f"Collected {count} errors", progress = (
- (error["position"] / job.meta["filesize"]) * 100))
- errors.append(error)
-
- jobs.update_meta(
- rconn, job, errors = errors, progress = 100, status = "success")
+ try:
+ dbconn = sqlite3.connect(dbpath)
+ job_meta = jobs.retrieve_meta(dbconn, job.get_id())
+ for error in parse_errors(
+ filepath, filetype, strain_names(parse_strains(strainsfile)),
+ seek_pos):
+ count = count + 1
+ progress = ((error["position"] / job_meta["filesize"]) * 100)
+ print(f"CURRENT PROGRESS: {progress}")
+ job_meta = jobs.update_meta(
+ dbconn, job_id, message = f"Collected {count} errors",
+ progress = progress)
+ errors.append(error)
+
+ job_meta = jobs.update_meta(
+ dbconn, job_id, errors = errors, progress = 100, status = "success")
+ os.remove(filepath)
+ except Exception as exc:
+ dbconn.rollback()
+ finally:
+ dbconn.close()
@parsebp.route("/fail/<job_id>", methods=["GET"])
-def fail(job_id):
+def fail(job_id: str):
"""Handle parsing failure"""
- old_job = jobs.job(job_id)
- if old_job:
- old_job.refresh()
- job_id = old_job.get_id()
- progress = old_job.meta.get("progress", 0)
- status = old_job.meta["status"]
- results = old_job.meta["results"]
- filename = old_job.meta.get("filename", "uploaded file")
-
- new_job = jobs.enqueue_job(
- "qc_app.parse.queued_collect_errors",
- os.path.join(
- app.config["UPLOAD_FOLDER"], old_job.meta["filename"]),
- results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"),
- app.config["REDIS_URL"], results["position"],
- additional_jobs_meta = {
- "status": "Collecting Errors",
- "filename": old_job.meta["filename"],
- "filesize": old_job.meta["filesize"],
- "starting_line_number": results["line_number"]})
- return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
-
- return render_template("no_such_job.html", job_id=job_id)
+ try:
+ dbpath = retrieve_dbpath()
+ dbconn = sqlite3.connect(dbpath)
+ old_job = jobs.job(job_id)
+ if old_job:
+ job_id = old_job.get_id()
+ old_meta = jobs.retrieve_meta(dbconn, job_id)
+ progress = old_meta["progress"]
+ status = old_meta["status"]
+ results = old_meta["results"]
+ filename = old_meta["filename"]
+
+ new_job = jobs.enqueue_job(
+ "qc_app.parse.queued_collect_errors",
+ os.path.join(
+ app.config["UPLOAD_FOLDER"], old_meta["filename"]),
+ results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"),
+ app.config["REDIS_URL"], dbpath, results["position"])
+ jobs.update_meta(
+ dbconn, new_job.get_id(), status = "Collecting Errors",
+ filename = old_meta["filename"], filesize = old_meta["filesize"],
+ starting_line_number = results["line_number"],
+ progress = progress)
+ return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
+
+ return render_template("no_such_job.html", job_id=job_id)
+ except Exception as exc:
+ dbconn.rollback()
+ finally:
+ dbconn.close()