From defc1cf0c1635f3262200a9ba25d8bd0c6fc0a93 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 28 Apr 2022 09:50:58 +0300 Subject: Update queuing and display results of file parsing * Make the 'worker' functions free from needing the application context by passing all the details they need as arguments. * Enable the display of parsing results. --- qc_app/parse.py | 94 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 53 insertions(+), 41 deletions(-) (limited to 'qc_app/parse.py') diff --git a/qc_app/parse.py b/qc_app/parse.py index ffdd123..3398918 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -1,6 +1,8 @@ """File parsing module""" import os +from functools import reduce +from redis import Redis from rq import get_current_job from flask import ( request, @@ -21,32 +23,32 @@ from quality_control.parsing import ( parsebp = Blueprint("parse", __name__) -def queued_parse(filepath, filetype): - from qc_app import create_app, instance_path - app = create_app(instance_path()) - app.app_context().push() - +def queued_parse(filepath, filetype, strainsfile, redis_url): job = get_current_job() - jobs.update_meta(job, status = "in-progress", progress = 0) - job.save_meta() - try: - parsed = parse_file( - filepath, filetype, strain_names(parse_strains("strains.csv"))) - for line, curr_size in parsed: + with Redis.from_url(redis_url) as rconn: + jobs.update_meta(rconn, job, status = "in-progress", progress = 0) + job.save_meta() + try: + parsed = parse_file( + filepath, filetype, strain_names(parse_strains(strainsfile))) + for line, curr_size in parsed: + jobs.update_meta( + rconn, job, progress = (curr_size/job.meta["filesize"]) * 100, + message = f"Parsed {curr_size} bytes") + print(f"Progress: {curr_size} bytes: {(curr_size/job.meta['filesize']) * 100}%") + + os.remove(filepath) jobs.update_meta( - job, progress = (curr_size/job.meta["filesize"]) * 100, - message = f"Parsed {curr_size} bytes") - - os.remove(filepath) - jobs.update_meta( - job, progress = 10, status = "success", message = "no errors found") - except ParseError as pe: - pe_dict = pe.args[0] - jobs.update_meta( - job, status = "parse-error", results = { - "filename": os.path.basename(filepath), "filetype": filetype, - "position": pe_dict["position"] - }) + rconn, job, progress = 10, status = "success", + message = "no errors found") + except ParseError as pe: + pe_dict = pe.args[0] + jobs.update_meta( + rconn, job, status = "parse-error", results = { + "filename": os.path.basename(filepath), "filetype": filetype, + "position": pe_dict["position"], + "line_number": pe_dict["line_number"] + }) @parsebp.route("/parse", methods=["GET"]) def parse(): @@ -75,6 +77,7 @@ def parse(): FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) job = jobs.enqueue_job( "qc_app.parse.queued_parse", filepath, filetype, + os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], additional_jobs_meta = { "filename": filename, "filesize": os.stat(filepath).st_size}) return redirect(url_for("parse.parse_status", job_id=job.get_id())) @@ -105,28 +108,35 @@ def parse_status(job_id): @parsebp.route("/results/", methods=["GET"]) def results(job_id): - """Indicates success if parsing the file is successful""" - return "STUB: Parse success!!!" + """Show results of parsing...""" + job = jobs.job(job_id) + if job: + filename = job.meta["filename"] + errors = job.meta.get("errors", []) + return render_template( + "parse_results.html", + errors=errors, + job_name = f"Parsing '{filename}'", + starting_line_number=job.meta.get("starting_line_number", 0)) -def queued_collect_errors(filepath, filetype, seek_pos=0): - from qc_app import create_app, instance_path - app = create_app(instance_path()) - app.app_context().push() + return render_template("no_such_job.html", job_id=job_id) +def queued_collect_errors(filepath, filetype, strainsfile, redis_url, seek_pos=0): job = get_current_job() errors = [] count = 0 + with Redis.from_url(redis_url) as rconn: + for error in parse_errors( + filepath, filetype, strain_names(parse_strains(strainsfile)), + seek_pos): + count = count + 1 + jobs.update_meta( + rconn, job, message = f"Collected {count} errors", progress = ( + (error["position"] / job.meta["filesize"]) * 100)) + errors.append(error) - for error in parse_errors( - filepath, filetype, strain_names(parse_strains("strains.csv")), - seek_pos): - count = count + 1 jobs.update_meta( - job, message = f"Collected {count} errors", progress = ( - (error["position"] / job.meta["filesize"]) * 100)) - errors.append(error) - - jobs.update_meta(job, errors = errors, progress = 100, status = "success") + rconn, job, errors = errors, progress = 100, status = "success") @parsebp.route("/fail/", methods=["GET"]) def fail(job_id): @@ -144,11 +154,13 @@ def fail(job_id): "qc_app.parse.queued_collect_errors", os.path.join( app.config["UPLOAD_FOLDER"], old_job.meta["filename"]), - results["filetype"], results["position"], + results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), + app.config["REDIS_URL"], results["position"], additional_jobs_meta = { "status": "Collecting Errors", "filename": old_job.meta["filename"], - "filesize": old_job.meta["filesize"]}) + "filesize": old_job.meta["filesize"], + "starting_line_number": results["line_number"]}) return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) return render_template("no_such_job.html", job_id=job_id) -- cgit v1.2.3