"""File parsing module""" import os import sqlite3 from functools import reduce from redis import Redis from rq import get_current_job from flask import ( request, url_for, redirect, Blueprint, render_template, current_app as app) from . import jobs from quality_control.errors import ParseError from quality_control.parsing import ( FileType, parse_file, strain_names, parse_errors) parsebp = Blueprint("parse", __name__) def queued_parse( filepath: str, filetype: FileType, strainsfile: str, redis_url: str, dbpath: str): job = get_current_job() job_id = job.get_id() with Redis.from_url(redis_url) as rconn: dbconn = sqlite3.connect(dbpath) try: job_meta = jobs.update_meta( dbconn, job_id, status = "in-progress", progress = 0) parsed = parse_file(filepath, filetype, strain_names(strainsfile)) for line, curr_size in parsed: job_meta = jobs.update_meta( dbconn, job_id, progress=((curr_size/job_meta["filesize"]) * 100), message = f"Parsed {curr_size} bytes") os.remove(filepath) jobs_meta = jobs.update_meta( dbconn, job_id, progress = 100, status = "success", message = "no errors found") except ParseError as pe: pe_dict = pe.args[0] job_meta = jobs.update_meta( dbconn, job_id, status = "parse-error", results = { "filename": os.path.basename(filepath), "filetype": filetype, "position": pe_dict["position"], "line_number": pe_dict["line_number"] }) finally: dbconn.close() def retrieve_dbpath(): return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db") @parsebp.route("/parse", methods=["GET"]) def parse(): """Trigger file parsing""" # TODO: Maybe implement external process to parse the files errors = False filename = request.args.get("filename") filetype = request.args.get("filetype") if filename is None: flash("No file provided", "alert-error") errors = True if filetype is None: flash("No filetype provided", "alert-error") errors = True filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename) if not os.path.exists(filepath): flash("Selected file does not exist (any longer)", "alert-danger") errors = True if errors: return redirect(url_for("entry.index")) jobs.create_jobs_table(retrieve_dbpath()) filetype = ( FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) job = jobs.enqueue_job( "qc_app.parse.queued_parse", filepath, filetype, os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], retrieve_dbpath()) try: dbconn = sqlite3.connect(retrieve_dbpath()) jobs.update_meta( dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size, status="enqueued", progress=0) except Exception as exc: import traceback print(traceback.format_exc()) dbconn.rollback() finally: dbconn.close() return redirect(url_for("parse.parse_status", job_id=job.get_id())) @parsebp.route("/status/", methods=["GET"]) def parse_status(job_id: str): try: dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10) job = jobs.job(job_id) if job: job_id = job.get_id() job_meta = jobs.retrieve_meta(dbconn, job_id) progress = job_meta["progress"] status = job_meta["status"] filename = job_meta.get("filename", "uploaded file") if status == "success": return redirect(url_for("parse.results", job_id=job_id)) if status == "parse-error": return redirect(url_for("parse.fail", job_id=job_id)) return render_template( "job_progress.html", job_id = job_id, job_status = status, progress = progress, message = job_meta.get("message", ""), job_name = f"Parsing '{filename}'") return render_template("no_such_job.html", job_id=job_id) except sqlite3.OperationalError: return redirect(url_for("parse.parse_status", job_id=job_id)) except Exception as exc: import traceback print(traceback.format_exc()) return exc finally: dbconn.close() @parsebp.route("/results/", methods=["GET"]) def results(job_id: str): """Show results of parsing...""" job = jobs.job(job_id) if job: try: dbconn = sqlite3.connect(retrieve_dbpath()) job_meta = jobs.retrieve_meta(dbconn, job_id) filename = job_meta["filename"] errors = job_meta.get("errors", []) return render_template( "parse_results.html", errors=errors, job_name = f"Parsing '{filename}'", starting_line_number=job_meta.get("starting_line_number", 0)) except Exception as exc: import traceback print(traceback.format_exc()) finally: dbconn.close() return render_template("no_such_job.html", job_id=job_id) def queued_collect_errors( filepath: str, filetype: FileType, strainsfile: str, redis_url: str, dbpath: str, seek_pos: int = 0): job = get_current_job() job_id = job.get_id() errors = [] count = 0 with Redis.from_url(redis_url) as rconn: try: dbconn = sqlite3.connect(dbpath) job_meta = jobs.retrieve_meta(dbconn, job.get_id()) for error in parse_errors( filepath, filetype, strain_names(strainsfile), seek_pos): count = count + 1 progress = ((error["position"] / job_meta["filesize"]) * 100) job_meta = jobs.update_meta( dbconn, job_id, message = f"Collected {count} errors", progress = progress) errors.append(error) job_meta = jobs.update_meta( dbconn, job_id, errors = errors, progress = 100, status = "success") os.remove(filepath) except Exception as exc: dbconn.rollback() finally: dbconn.close() @parsebp.route("/fail/", methods=["GET"]) def fail(job_id: str): """Handle parsing failure""" try: dbpath = retrieve_dbpath() dbconn = sqlite3.connect(dbpath) old_job = jobs.job(job_id) if old_job: job_id = old_job.get_id() old_meta = jobs.retrieve_meta(dbconn, job_id) progress = old_meta["progress"] status = old_meta["status"] results = old_meta["results"] filename = old_meta["filename"] new_job = jobs.enqueue_job( "qc_app.parse.queued_collect_errors", os.path.join( app.config["UPLOAD_FOLDER"], old_meta["filename"]), results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], dbpath, results["position"]) jobs.update_meta( dbconn, new_job.get_id(), status = "Collecting Errors", filename = old_meta["filename"], filesize = old_meta["filesize"], starting_line_number = results["line_number"], progress = progress) return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) return render_template("no_such_job.html", job_id=job_id) except Exception as exc: dbconn.rollback() finally: dbconn.close()