diff options
author | Frederick Muriuki Muriithi | 2022-04-28 15:32:35 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-04-28 15:32:35 +0300 |
commit | 5632dcab27058875de99d63cbd263acfa3a9a2d5 (patch) | |
tree | 93cce3204086c14760c3e76497b8b106100b4a96 /qc_app | |
parent | defc1cf0c1635f3262200a9ba25d8bd0c6fc0a93 (diff) | |
download | gn-uploader-5632dcab27058875de99d63cbd263acfa3a9a2d5.tar.gz |
Use sqlite3 to save the jobs metadata
* Use sqlite to save the jobs metadata and enable UI update of the
progress for large files
Diffstat (limited to 'qc_app')
-rw-r--r-- | qc_app/jobs.py | 71 | ||||
-rw-r--r-- | qc_app/parse.py | 217 |
2 files changed, 192 insertions, 96 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py index f613e61..afea419 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,9 +1,12 @@ +import jsonpickle +import sqlite3 + from rq import Queue from rq.job import Job from redis import Redis from flask import current_app as app -def enqueue_job(delayed_fn, *args, **kwargs): +def enqueue_job(delayed_fn: str, *args, **kwargs): """Add job to queue""" with Redis.from_url(app.config["REDIS_URL"]) as rconn: job = Job.create( @@ -11,21 +14,18 @@ def enqueue_job(delayed_fn, *args, **kwargs): key: val for key, val in kwargs.items() if key != "additional_jobs_meta" }, - connection = rconn) - job.meta["status"] = "enqueued" - job.meta["progress"] = 0 - if "additional_jobs_meta" in kwargs: - for key, val in kwargs["additional_jobs_meta"].items(): - job.meta[key] = val - - job.save_meta() + connection = rconn, + timeout = "2h", + result_ttl = "5h", + failure_ttl = "5h" + ) queue = Queue("qcapp_queue", connection=rconn) queue.enqueue_job(job) return job -def job(job_id): +def job(job_id: str): "Retrieve the job" with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) @@ -34,14 +34,49 @@ def job(job_id): return job -def update_meta(rconn, stale_job, **kwargs): +def create_jobs_table(dbpath: str): + """Create sqlite3 table to track job progress""" + conn = sqlite3.connect(dbpath) + cursor = conn.cursor() + cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)") + cursor.close() + conn.close() + +def retrieve_meta(conn, job_id: str): + """Retrieve the job's metadata.""" + meta = {} + try: + cursor = conn.cursor() + cursor.execute( + "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id}) + db_row = cursor.fetchone() + # meta = json.loads(db_row [1]) if db_row else {} + meta = jsonpickle.decode(db_row [1]) if db_row else {} + except Exception as exc: + cursor.close() + raise exc + finally: + cursor.close() + + return meta + +def update_meta(conn, job_id: str, **meta): """Update the job's metadata.""" - job = Job.fetch(stale_job.get_id(), connection=rconn) - job.refresh() - meta_dict = {**stale_job.meta, **job.meta, **kwargs} - for key, val in meta_dict.items(): - job.meta[key] = val + try: + cursor = conn.cursor() + old_meta = retrieve_meta(conn, job_id) + meta = {**old_meta, **meta} + query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id" + if not bool(old_meta): + query = "INSERT INTO jobs VALUES (:job_id, :meta)" - job.save_meta() + cursor.execute( + query, {"job_id": job_id, "meta": jsonpickle.encode(meta)}) + conn.commit() + except Exception as exc: + cursor.close() + raise exc + finally: + cursor.close() - return job + return meta diff --git a/qc_app/parse.py b/qc_app/parse.py index 3398918..795cc01 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -1,5 +1,6 @@ """File parsing module""" import os +import sqlite3 from functools import reduce from redis import Redis @@ -23,32 +24,41 @@ from quality_control.parsing import ( parsebp = Blueprint("parse", __name__) -def queued_parse(filepath, filetype, strainsfile, redis_url): +def queued_parse( + filepath: str, filetype: FileType, strainsfile: str, redis_url: str, + dbpath: str): job = get_current_job() + job_id = job.get_id() with Redis.from_url(redis_url) as rconn: - jobs.update_meta(rconn, job, status = "in-progress", progress = 0) - job.save_meta() + dbconn = sqlite3.connect(dbpath) try: + job_meta = jobs.update_meta( + dbconn, job_id, status = "in-progress", progress = 0) parsed = parse_file( filepath, filetype, strain_names(parse_strains(strainsfile))) for line, curr_size in parsed: - jobs.update_meta( - rconn, job, progress = (curr_size/job.meta["filesize"]) * 100, + job_meta = jobs.update_meta( + dbconn, job_id, + progress=((curr_size/job_meta["filesize"]) * 100), message = f"Parsed {curr_size} bytes") - print(f"Progress: {curr_size} bytes: {(curr_size/job.meta['filesize']) * 100}%") os.remove(filepath) - jobs.update_meta( - rconn, job, progress = 10, status = "success", + jobs_meta = jobs.update_meta( + dbconn, job_id, progress = 100, status = "success", message = "no errors found") except ParseError as pe: pe_dict = pe.args[0] - jobs.update_meta( - rconn, job, status = "parse-error", results = { + job_meta = jobs.update_meta( + dbconn, job_id, status = "parse-error", results = { "filename": os.path.basename(filepath), "filetype": filetype, "position": pe_dict["position"], "line_number": pe_dict["line_number"] }) + finally: + dbconn.close() + +def retrieve_dbpath(): + return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db") @parsebp.route("/parse", methods=["GET"]) def parse(): @@ -73,94 +83,145 @@ def parse(): if errors: return redirect(url_for("entry.index")) + jobs.create_jobs_table(retrieve_dbpath()) filetype = ( FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) job = jobs.enqueue_job( "qc_app.parse.queued_parse", filepath, filetype, os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], - additional_jobs_meta = { - "filename": filename, "filesize": os.stat(filepath).st_size}) + retrieve_dbpath()) + try: + dbconn = sqlite3.connect(retrieve_dbpath()) + jobs.update_meta( + dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size, + status="enqueued", progress=0) + except Exception as exc: + import traceback + print(traceback.format_exc()) + dbconn.rollback() + finally: + dbconn.close() + return redirect(url_for("parse.parse_status", job_id=job.get_id())) @parsebp.route("/status/<job_id>", methods=["GET"]) -def parse_status(job_id): - job = jobs.job(job_id) - if job: - job_id = job.get_id() - progress = job.meta["progress"] - status = job.meta["status"] - filename = job.meta.get("filename", "uploaded file") - if status == "success": - return redirect(url_for("parse.results", job_id=job_id)) - - if status == "parse-error": - return redirect(url_for("parse.fail", job_id=job_id)) - - return render_template( - "job_progress.html", - job_id = job_id, - job_status = status, - progress = progress, - message = job.meta.get("message", ""), - job_name = f"Parsing '{filename}'") - - return render_template("no_such_job.html", job_id=job_id) +def parse_status(job_id: str): + try: + dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10) + job = jobs.job(job_id) + if job: + job_id = job.get_id() + job_meta = jobs.retrieve_meta(dbconn, job_id) + progress = job_meta["progress"] + status = job_meta["status"] + filename = job_meta.get("filename", "uploaded file") + if status == "success": + return redirect(url_for("parse.results", job_id=job_id)) + + if status == "parse-error": + return redirect(url_for("parse.fail", job_id=job_id)) + + return render_template( + "job_progress.html", + job_id = job_id, + job_status = status, + progress = progress, + message = job_meta.get("message", ""), + job_name = f"Parsing '{filename}'") + + return render_template("no_such_job.html", job_id=job_id) + except sqlite3.OperationalError: + return redirect(url_for("parse.parse_status", job_id=job_id)) + except Exception as exc: + import traceback + print(traceback.format_exc()) + return exc + finally: + dbconn.close() @parsebp.route("/results/<job_id>", methods=["GET"]) -def results(job_id): +def results(job_id: str): """Show results of parsing...""" job = jobs.job(job_id) if job: - filename = job.meta["filename"] - errors = job.meta.get("errors", []) - return render_template( - "parse_results.html", - errors=errors, - job_name = f"Parsing '{filename}'", - starting_line_number=job.meta.get("starting_line_number", 0)) + try: + dbconn = sqlite3.connect(retrieve_dbpath()) + job_meta = jobs.retrieve_meta(dbconn, job_id) + filename = job_meta["filename"] + errors = job_meta.get("errors", []) + return render_template( + "parse_results.html", + errors=errors, + job_name = f"Parsing '{filename}'", + starting_line_number=job_meta.get("starting_line_number", 0)) + except Exception as exc: + import traceback + print(traceback.format_exc()) + finally: + dbconn.close() return render_template("no_such_job.html", job_id=job_id) -def queued_collect_errors(filepath, filetype, strainsfile, redis_url, seek_pos=0): +def queued_collect_errors( + filepath: str, filetype: FileType, strainsfile: str, redis_url: str, + dbpath: str, seek_pos: int = 0): job = get_current_job() + job_id = job.get_id() errors = [] count = 0 with Redis.from_url(redis_url) as rconn: - for error in parse_errors( - filepath, filetype, strain_names(parse_strains(strainsfile)), - seek_pos): - count = count + 1 - jobs.update_meta( - rconn, job, message = f"Collected {count} errors", progress = ( - (error["position"] / job.meta["filesize"]) * 100)) - errors.append(error) - - jobs.update_meta( - rconn, job, errors = errors, progress = 100, status = "success") + try: + dbconn = sqlite3.connect(dbpath) + job_meta = jobs.retrieve_meta(dbconn, job.get_id()) + for error in parse_errors( + filepath, filetype, strain_names(parse_strains(strainsfile)), + seek_pos): + count = count + 1 + progress = ((error["position"] / job_meta["filesize"]) * 100) + print(f"CURRENT PROGRESS: {progress}") + job_meta = jobs.update_meta( + dbconn, job_id, message = f"Collected {count} errors", + progress = progress) + errors.append(error) + + job_meta = jobs.update_meta( + dbconn, job_id, errors = errors, progress = 100, status = "success") + os.remove(filepath) + except Exception as exc: + dbconn.rollback() + finally: + dbconn.close() @parsebp.route("/fail/<job_id>", methods=["GET"]) -def fail(job_id): +def fail(job_id: str): """Handle parsing failure""" - old_job = jobs.job(job_id) - if old_job: - old_job.refresh() - job_id = old_job.get_id() - progress = old_job.meta.get("progress", 0) - status = old_job.meta["status"] - results = old_job.meta["results"] - filename = old_job.meta.get("filename", "uploaded file") - - new_job = jobs.enqueue_job( - "qc_app.parse.queued_collect_errors", - os.path.join( - app.config["UPLOAD_FOLDER"], old_job.meta["filename"]), - results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), - app.config["REDIS_URL"], results["position"], - additional_jobs_meta = { - "status": "Collecting Errors", - "filename": old_job.meta["filename"], - "filesize": old_job.meta["filesize"], - "starting_line_number": results["line_number"]}) - return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) - - return render_template("no_such_job.html", job_id=job_id) + try: + dbpath = retrieve_dbpath() + dbconn = sqlite3.connect(dbpath) + old_job = jobs.job(job_id) + if old_job: + job_id = old_job.get_id() + old_meta = jobs.retrieve_meta(dbconn, job_id) + progress = old_meta["progress"] + status = old_meta["status"] + results = old_meta["results"] + filename = old_meta["filename"] + + new_job = jobs.enqueue_job( + "qc_app.parse.queued_collect_errors", + os.path.join( + app.config["UPLOAD_FOLDER"], old_meta["filename"]), + results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), + app.config["REDIS_URL"], dbpath, results["position"]) + jobs.update_meta( + dbconn, new_job.get_id(), status = "Collecting Errors", + filename = old_meta["filename"], filesize = old_meta["filesize"], + starting_line_number = results["line_number"], + progress = progress) + return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) + + return render_template("no_such_job.html", job_id=job_id) + except Exception as exc: + dbconn.rollback() + finally: + dbconn.close() |