From defc1cf0c1635f3262200a9ba25d8bd0c6fc0a93 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 28 Apr 2022 09:50:58 +0300 Subject: Update queuing and display results of file parsing * Make the 'worker' functions free from needing the application context by passing all the details they need as arguments. * Enable the display of parsing results. --- qc_app/entry.py | 2 + qc_app/jobs.py | 19 ++++----- qc_app/parse.py | 94 +++++++++++++++++++++++++------------------- qc_app/static/css/styles.css | 34 ++++++++++++++++ 4 files changed, 99 insertions(+), 50 deletions(-) (limited to 'qc_app') diff --git a/qc_app/entry.py b/qc_app/entry.py index 31d64ca..f91e59d 100644 --- a/qc_app/entry.py +++ b/qc_app/entry.py @@ -41,6 +41,8 @@ def upload_file(): filename = secure_filename(text_file.filename) if not os.path.exists(app.config["UPLOAD_FOLDER"]): os.mkdir(app.config["UPLOAD_FOLDER"]) + + filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename) text_file.save(os.path.join(app.config["UPLOAD_FOLDER"], filename)) return redirect(url_for( diff --git a/qc_app/jobs.py b/qc_app/jobs.py index 908c244..f613e61 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -4,6 +4,7 @@ from redis import Redis from flask import current_app as app def enqueue_job(delayed_fn, *args, **kwargs): + """Add job to queue""" with Redis.from_url(app.config["REDIS_URL"]) as rconn: job = Job.create( delayed_fn, args, **{ @@ -25,6 +26,7 @@ def enqueue_job(delayed_fn, *args, **kwargs): return job def job(job_id): + "Retrieve the job" with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) job = queue.fetch_job(job_id) @@ -32,15 +34,14 @@ def job(job_id): return job -def update_meta(stale_job, **kwargs): - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - queue = Queue("qcapp_queue", connection=rconn) - job = queue.fetch_job(stale_job.get_id()) - job.refresh() - meta_dict = {**stale_job.meta, **job.meta, **kwargs} - for key, val in meta_dict.items(): - job.meta[key] = val +def update_meta(rconn, stale_job, **kwargs): + """Update the job's metadata.""" + job = Job.fetch(stale_job.get_id(), connection=rconn) + job.refresh() + meta_dict = {**stale_job.meta, **job.meta, **kwargs} + for key, val in meta_dict.items(): + job.meta[key] = val - job.save_meta() + job.save_meta() return job diff --git a/qc_app/parse.py b/qc_app/parse.py index ffdd123..3398918 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -1,6 +1,8 @@ """File parsing module""" import os +from functools import reduce +from redis import Redis from rq import get_current_job from flask import ( request, @@ -21,32 +23,32 @@ from quality_control.parsing import ( parsebp = Blueprint("parse", __name__) -def queued_parse(filepath, filetype): - from qc_app import create_app, instance_path - app = create_app(instance_path()) - app.app_context().push() - +def queued_parse(filepath, filetype, strainsfile, redis_url): job = get_current_job() - jobs.update_meta(job, status = "in-progress", progress = 0) - job.save_meta() - try: - parsed = parse_file( - filepath, filetype, strain_names(parse_strains("strains.csv"))) - for line, curr_size in parsed: + with Redis.from_url(redis_url) as rconn: + jobs.update_meta(rconn, job, status = "in-progress", progress = 0) + job.save_meta() + try: + parsed = parse_file( + filepath, filetype, strain_names(parse_strains(strainsfile))) + for line, curr_size in parsed: + jobs.update_meta( + rconn, job, progress = (curr_size/job.meta["filesize"]) * 100, + message = f"Parsed {curr_size} bytes") + print(f"Progress: {curr_size} bytes: {(curr_size/job.meta['filesize']) * 100}%") + + os.remove(filepath) jobs.update_meta( - job, progress = (curr_size/job.meta["filesize"]) * 100, - message = f"Parsed {curr_size} bytes") - - os.remove(filepath) - jobs.update_meta( - job, progress = 10, status = "success", message = "no errors found") - except ParseError as pe: - pe_dict = pe.args[0] - jobs.update_meta( - job, status = "parse-error", results = { - "filename": os.path.basename(filepath), "filetype": filetype, - "position": pe_dict["position"] - }) + rconn, job, progress = 10, status = "success", + message = "no errors found") + except ParseError as pe: + pe_dict = pe.args[0] + jobs.update_meta( + rconn, job, status = "parse-error", results = { + "filename": os.path.basename(filepath), "filetype": filetype, + "position": pe_dict["position"], + "line_number": pe_dict["line_number"] + }) @parsebp.route("/parse", methods=["GET"]) def parse(): @@ -75,6 +77,7 @@ def parse(): FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) job = jobs.enqueue_job( "qc_app.parse.queued_parse", filepath, filetype, + os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], additional_jobs_meta = { "filename": filename, "filesize": os.stat(filepath).st_size}) return redirect(url_for("parse.parse_status", job_id=job.get_id())) @@ -105,28 +108,35 @@ def parse_status(job_id): @parsebp.route("/results/", methods=["GET"]) def results(job_id): - """Indicates success if parsing the file is successful""" - return "STUB: Parse success!!!" + """Show results of parsing...""" + job = jobs.job(job_id) + if job: + filename = job.meta["filename"] + errors = job.meta.get("errors", []) + return render_template( + "parse_results.html", + errors=errors, + job_name = f"Parsing '{filename}'", + starting_line_number=job.meta.get("starting_line_number", 0)) -def queued_collect_errors(filepath, filetype, seek_pos=0): - from qc_app import create_app, instance_path - app = create_app(instance_path()) - app.app_context().push() + return render_template("no_such_job.html", job_id=job_id) +def queued_collect_errors(filepath, filetype, strainsfile, redis_url, seek_pos=0): job = get_current_job() errors = [] count = 0 + with Redis.from_url(redis_url) as rconn: + for error in parse_errors( + filepath, filetype, strain_names(parse_strains(strainsfile)), + seek_pos): + count = count + 1 + jobs.update_meta( + rconn, job, message = f"Collected {count} errors", progress = ( + (error["position"] / job.meta["filesize"]) * 100)) + errors.append(error) - for error in parse_errors( - filepath, filetype, strain_names(parse_strains("strains.csv")), - seek_pos): - count = count + 1 jobs.update_meta( - job, message = f"Collected {count} errors", progress = ( - (error["position"] / job.meta["filesize"]) * 100)) - errors.append(error) - - jobs.update_meta(job, errors = errors, progress = 100, status = "success") + rconn, job, errors = errors, progress = 100, status = "success") @parsebp.route("/fail/", methods=["GET"]) def fail(job_id): @@ -144,11 +154,13 @@ def fail(job_id): "qc_app.parse.queued_collect_errors", os.path.join( app.config["UPLOAD_FOLDER"], old_job.meta["filename"]), - results["filetype"], results["position"], + results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), + app.config["REDIS_URL"], results["position"], additional_jobs_meta = { "status": "Collecting Errors", "filename": old_job.meta["filename"], - "filesize": old_job.meta["filesize"]}) + "filesize": old_job.meta["filesize"], + "starting_line_number": results["line_number"]}) return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) return render_template("no_such_job.html", job_id=job_id) diff --git a/qc_app/static/css/styles.css b/qc_app/static/css/styles.css index d009e40..a5f0e01 100644 --- a/qc_app/static/css/styles.css +++ b/qc_app/static/css/styles.css @@ -41,3 +41,37 @@ fieldset { background-color: #F8D7DA; border-color: #E7C6C9; } + +.alert-success { + colour: #448944; + font-weight: bold; + background-color: #AAEEAA; +} + +table { + border-collapse: collapse; +} + +.reports-table { + border: 1px solid; + border-color: #336699; +} + +.reports-table thead { + color: #FEFEFE; + background-color: #336699; + border-width: 0 1px 0 1px; + border-style: solid; + border-color: #336699; +} + +.reports-table thead tr th { + text-transform: capitalize; +} + +.reports-table th,td { + border-width: 0 1px 0 1px; + border-style: solid; + border-color: #336699; + padding: 0 0.3em 0.3em 0.3em; +} -- cgit v1.2.3