From 5632dcab27058875de99d63cbd263acfa3a9a2d5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 28 Apr 2022 15:32:35 +0300 Subject: Use sqlite3 to save the jobs metadata * Use sqlite to save the jobs metadata and enable UI update of the progress for large files --- qc_app/parse.py | 217 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 139 insertions(+), 78 deletions(-) (limited to 'qc_app/parse.py') diff --git a/qc_app/parse.py b/qc_app/parse.py index 3398918..795cc01 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -1,5 +1,6 @@ """File parsing module""" import os +import sqlite3 from functools import reduce from redis import Redis @@ -23,32 +24,41 @@ from quality_control.parsing import ( parsebp = Blueprint("parse", __name__) -def queued_parse(filepath, filetype, strainsfile, redis_url): +def queued_parse( + filepath: str, filetype: FileType, strainsfile: str, redis_url: str, + dbpath: str): job = get_current_job() + job_id = job.get_id() with Redis.from_url(redis_url) as rconn: - jobs.update_meta(rconn, job, status = "in-progress", progress = 0) - job.save_meta() + dbconn = sqlite3.connect(dbpath) try: + job_meta = jobs.update_meta( + dbconn, job_id, status = "in-progress", progress = 0) parsed = parse_file( filepath, filetype, strain_names(parse_strains(strainsfile))) for line, curr_size in parsed: - jobs.update_meta( - rconn, job, progress = (curr_size/job.meta["filesize"]) * 100, + job_meta = jobs.update_meta( + dbconn, job_id, + progress=((curr_size/job_meta["filesize"]) * 100), message = f"Parsed {curr_size} bytes") - print(f"Progress: {curr_size} bytes: {(curr_size/job.meta['filesize']) * 100}%") os.remove(filepath) - jobs.update_meta( - rconn, job, progress = 10, status = "success", + jobs_meta = jobs.update_meta( + dbconn, job_id, progress = 100, status = "success", message = "no errors found") except ParseError as pe: pe_dict = pe.args[0] - jobs.update_meta( - rconn, job, status = "parse-error", results = { + job_meta = jobs.update_meta( + dbconn, job_id, status = "parse-error", results = { "filename": os.path.basename(filepath), "filetype": filetype, "position": pe_dict["position"], "line_number": pe_dict["line_number"] }) + finally: + dbconn.close() + +def retrieve_dbpath(): + return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db") @parsebp.route("/parse", methods=["GET"]) def parse(): @@ -73,94 +83,145 @@ def parse(): if errors: return redirect(url_for("entry.index")) + jobs.create_jobs_table(retrieve_dbpath()) filetype = ( FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) job = jobs.enqueue_job( "qc_app.parse.queued_parse", filepath, filetype, os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], - additional_jobs_meta = { - "filename": filename, "filesize": os.stat(filepath).st_size}) + retrieve_dbpath()) + try: + dbconn = sqlite3.connect(retrieve_dbpath()) + jobs.update_meta( + dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size, + status="enqueued", progress=0) + except Exception as exc: + import traceback + print(traceback.format_exc()) + dbconn.rollback() + finally: + dbconn.close() + return redirect(url_for("parse.parse_status", job_id=job.get_id())) @parsebp.route("/status/", methods=["GET"]) -def parse_status(job_id): - job = jobs.job(job_id) - if job: - job_id = job.get_id() - progress = job.meta["progress"] - status = job.meta["status"] - filename = job.meta.get("filename", "uploaded file") - if status == "success": - return redirect(url_for("parse.results", job_id=job_id)) - - if status == "parse-error": - return redirect(url_for("parse.fail", job_id=job_id)) - - return render_template( - "job_progress.html", - job_id = job_id, - job_status = status, - progress = progress, - message = job.meta.get("message", ""), - job_name = f"Parsing '{filename}'") - - return render_template("no_such_job.html", job_id=job_id) +def parse_status(job_id: str): + try: + dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10) + job = jobs.job(job_id) + if job: + job_id = job.get_id() + job_meta = jobs.retrieve_meta(dbconn, job_id) + progress = job_meta["progress"] + status = job_meta["status"] + filename = job_meta.get("filename", "uploaded file") + if status == "success": + return redirect(url_for("parse.results", job_id=job_id)) + + if status == "parse-error": + return redirect(url_for("parse.fail", job_id=job_id)) + + return render_template( + "job_progress.html", + job_id = job_id, + job_status = status, + progress = progress, + message = job_meta.get("message", ""), + job_name = f"Parsing '{filename}'") + + return render_template("no_such_job.html", job_id=job_id) + except sqlite3.OperationalError: + return redirect(url_for("parse.parse_status", job_id=job_id)) + except Exception as exc: + import traceback + print(traceback.format_exc()) + return exc + finally: + dbconn.close() @parsebp.route("/results/", methods=["GET"]) -def results(job_id): +def results(job_id: str): """Show results of parsing...""" job = jobs.job(job_id) if job: - filename = job.meta["filename"] - errors = job.meta.get("errors", []) - return render_template( - "parse_results.html", - errors=errors, - job_name = f"Parsing '{filename}'", - starting_line_number=job.meta.get("starting_line_number", 0)) + try: + dbconn = sqlite3.connect(retrieve_dbpath()) + job_meta = jobs.retrieve_meta(dbconn, job_id) + filename = job_meta["filename"] + errors = job_meta.get("errors", []) + return render_template( + "parse_results.html", + errors=errors, + job_name = f"Parsing '{filename}'", + starting_line_number=job_meta.get("starting_line_number", 0)) + except Exception as exc: + import traceback + print(traceback.format_exc()) + finally: + dbconn.close() return render_template("no_such_job.html", job_id=job_id) -def queued_collect_errors(filepath, filetype, strainsfile, redis_url, seek_pos=0): +def queued_collect_errors( + filepath: str, filetype: FileType, strainsfile: str, redis_url: str, + dbpath: str, seek_pos: int = 0): job = get_current_job() + job_id = job.get_id() errors = [] count = 0 with Redis.from_url(redis_url) as rconn: - for error in parse_errors( - filepath, filetype, strain_names(parse_strains(strainsfile)), - seek_pos): - count = count + 1 - jobs.update_meta( - rconn, job, message = f"Collected {count} errors", progress = ( - (error["position"] / job.meta["filesize"]) * 100)) - errors.append(error) - - jobs.update_meta( - rconn, job, errors = errors, progress = 100, status = "success") + try: + dbconn = sqlite3.connect(dbpath) + job_meta = jobs.retrieve_meta(dbconn, job.get_id()) + for error in parse_errors( + filepath, filetype, strain_names(parse_strains(strainsfile)), + seek_pos): + count = count + 1 + progress = ((error["position"] / job_meta["filesize"]) * 100) + print(f"CURRENT PROGRESS: {progress}") + job_meta = jobs.update_meta( + dbconn, job_id, message = f"Collected {count} errors", + progress = progress) + errors.append(error) + + job_meta = jobs.update_meta( + dbconn, job_id, errors = errors, progress = 100, status = "success") + os.remove(filepath) + except Exception as exc: + dbconn.rollback() + finally: + dbconn.close() @parsebp.route("/fail/", methods=["GET"]) -def fail(job_id): +def fail(job_id: str): """Handle parsing failure""" - old_job = jobs.job(job_id) - if old_job: - old_job.refresh() - job_id = old_job.get_id() - progress = old_job.meta.get("progress", 0) - status = old_job.meta["status"] - results = old_job.meta["results"] - filename = old_job.meta.get("filename", "uploaded file") - - new_job = jobs.enqueue_job( - "qc_app.parse.queued_collect_errors", - os.path.join( - app.config["UPLOAD_FOLDER"], old_job.meta["filename"]), - results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), - app.config["REDIS_URL"], results["position"], - additional_jobs_meta = { - "status": "Collecting Errors", - "filename": old_job.meta["filename"], - "filesize": old_job.meta["filesize"], - "starting_line_number": results["line_number"]}) - return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) - - return render_template("no_such_job.html", job_id=job_id) + try: + dbpath = retrieve_dbpath() + dbconn = sqlite3.connect(dbpath) + old_job = jobs.job(job_id) + if old_job: + job_id = old_job.get_id() + old_meta = jobs.retrieve_meta(dbconn, job_id) + progress = old_meta["progress"] + status = old_meta["status"] + results = old_meta["results"] + filename = old_meta["filename"] + + new_job = jobs.enqueue_job( + "qc_app.parse.queued_collect_errors", + os.path.join( + app.config["UPLOAD_FOLDER"], old_meta["filename"]), + results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), + app.config["REDIS_URL"], dbpath, results["position"]) + jobs.update_meta( + dbconn, new_job.get_id(), status = "Collecting Errors", + filename = old_meta["filename"], filesize = old_meta["filesize"], + starting_line_number = results["line_number"], + progress = progress) + return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) + + return render_template("no_such_job.html", job_id=job_id) + except Exception as exc: + dbconn.rollback() + finally: + dbconn.close() -- cgit v1.2.3