"""File parsing module""" import os from rq import get_current_job from flask import ( request, url_for, redirect, Blueprint, render_template, current_app as app) from . import jobs from quality_control.errors import ParseError from quality_control.parsing import ( FileType, parse_file, strain_names, parse_errors, parse_strains) parsebp = Blueprint("parse", __name__) def queued_parse(filepath, filetype): from qc_app import create_app, instance_path app = create_app(instance_path()) app.app_context().push() job = get_current_job() jobs.update_meta(job, status = "in-progress", progress = 0) job.save_meta() try: parsed = parse_file( filepath, filetype, strain_names(parse_strains("strains.csv"))) for line, curr_size in parsed: jobs.update_meta( job, progress = (curr_size/job.meta["filesize"]) * 100, message = f"Parsed {curr_size} bytes") os.remove(filepath) jobs.update_meta( job, progress = 10, status = "success", message = "no errors found") except ParseError as pe: pe_dict = pe.args[0] jobs.update_meta( job, status = "parse-error", results = { "filename": os.path.basename(filepath), "filetype": filetype, "position": pe_dict["position"] }) @parsebp.route("/parse", methods=["GET"]) def parse(): """Trigger file parsing""" # TODO: Maybe implement external process to parse the files errors = False filename = request.args.get("filename") filetype = request.args.get("filetype") if filename is None: flash("No file provided", "alert-error") errors = True if filetype is None: flash("No filetype provided", "alert-error") errors = True filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename) if not os.path.exists(filepath): flash("Selected file does not exist (any longer)", "alert-danger") errors = True if errors: return redirect(url_for("entry.index")) filetype = ( FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) job = jobs.enqueue_job( "qc_app.parse.queued_parse", filepath, filetype, additional_jobs_meta = { "filename": filename, "filesize": os.stat(filepath).st_size}) return redirect(url_for("parse.parse_status", job_id=job.get_id())) @parsebp.route("/status/", methods=["GET"]) def parse_status(job_id): job = jobs.job(job_id) if job: job_id = job.get_id() progress = job.meta["progress"] status = job.meta["status"] filename = job.meta.get("filename", "uploaded file") if status == "success": return redirect(url_for("parse.results", job_id=job_id)) if status == "parse-error": return redirect(url_for("parse.fail", job_id=job_id)) return render_template( "job_progress.html", job_id = job_id, job_status = status, progress = progress, message = job.meta.get("message", ""), job_name = f"Parsing '{filename}'") return render_template("no_such_job.html", job_id=job_id) @parsebp.route("/results/", methods=["GET"]) def results(job_id): """Indicates success if parsing the file is successful""" return "STUB: Parse success!!!" def queued_collect_errors(filepath, filetype, seek_pos=0): from qc_app import create_app, instance_path app = create_app(instance_path()) app.app_context().push() job = get_current_job() errors = [] count = 0 for error in parse_errors( filepath, filetype, strain_names(parse_strains("strains.csv")), seek_pos): count = count + 1 jobs.update_meta( job, message = f"Collected {count} errors", progress = ( (error["position"] / job.meta["filesize"]) * 100)) errors.append(error) jobs.update_meta(job, errors = errors, progress = 100, status = "success") @parsebp.route("/fail/", methods=["GET"]) def fail(job_id): """Handle parsing failure""" old_job = jobs.job(job_id) if old_job: old_job.refresh() job_id = old_job.get_id() progress = old_job.meta.get("progress", 0) status = old_job.meta["status"] results = old_job.meta["results"] filename = old_job.meta.get("filename", "uploaded file") new_job = jobs.enqueue_job( "qc_app.parse.queued_collect_errors", os.path.join( app.config["UPLOAD_FOLDER"], old_job.meta["filename"]), results["filetype"], results["position"], additional_jobs_meta = { "status": "Collecting Errors", "filename": old_job.meta["filename"], "filesize": old_job.meta["filesize"]}) return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) return render_template("no_such_job.html", job_id=job_id)