aboutsummaryrefslogtreecommitdiff
path: root/qc_app/parse.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-19 15:26:15 +0300
committerFrederick Muriuki Muriithi2022-05-19 15:30:20 +0300
commit2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c (patch)
tree9e0fdcc010925a9f6d5674d41991b6185aea54e3 /qc_app/parse.py
parent27f6e9e28f2a3244bdd00336cf918de97b2ceed6 (diff)
downloadgn-uploader-2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c.tar.gz
Update Web-UI: use new error collection paradigm
- README.org: document how to run scripts manually - manifest.scm: remove python-rq as a dependency - qc_app/jobs.py: rework job launching and processing - qc_app/parse.py: use reworked job processing - qc_app/templates/job_progress.html: display progress correctly - qc_app/templates/parse_results.html: display final results - scripts/worker.py: new worker script
Diffstat (limited to 'qc_app/parse.py')
-rw-r--r--qc_app/parse.py237
1 files changed, 71 insertions, 166 deletions
diff --git a/qc_app/parse.py b/qc_app/parse.py
index baad9a6..d2fa030 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -1,10 +1,9 @@
"""File parsing module"""
import os
-import sqlite3
from functools import reduce
+import jsonpickle
from redis import Redis
-from rq import get_current_job
from flask import (
request,
url_for,
@@ -14,50 +13,13 @@ from flask import (
current_app as app)
from . import jobs
-from quality_control.errors import ParseError
+from quality_control.errors import InvalidValue
from quality_control.parsing import (
FileType,
- parse_file,
- strain_names,
- parse_errors)
+ strain_names)
parsebp = Blueprint("parse", __name__)
-def queued_parse(
- filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
- dbpath: str):
- job = get_current_job()
- job_id = job.get_id()
- with Redis.from_url(redis_url) as rconn:
- dbconn = sqlite3.connect(dbpath)
- try:
- job_meta = jobs.update_meta(
- dbconn, job_id, status = "in-progress", progress = 0)
- parsed = parse_file(filepath, filetype, strain_names(strainsfile))
- for line, curr_size in parsed:
- job_meta = jobs.update_meta(
- dbconn, job_id,
- progress=((curr_size/job_meta["filesize"]) * 100),
- message = f"Parsed {curr_size} bytes")
-
- os.remove(filepath)
- jobs_meta = jobs.update_meta(
- dbconn, job_id, progress = 100, status = "success",
- message = "no errors found")
- except ParseError as pe:
- pe_dict = pe.args[0]
- job_meta = jobs.update_meta(
- dbconn, job_id, status = "parse-error", results = {
- "filename": os.path.basename(filepath), "filetype": filetype,
- "position": pe_dict["position"],
- "line_number": pe_dict["line_number"]
- })
- finally:
- dbconn.close()
-
-def retrieve_dbpath():
- return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db")
-
@parsebp.route("/parse", methods=["GET"])
def parse():
"""Trigger file parsing"""
@@ -73,6 +35,10 @@ def parse():
flash("No filetype provided", "alert-error")
errors = True
+ if filetype not in ("average", "standard-error"):
+ flash("Invalid filetype provided", "alert-error")
+ errors = True
+
filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename)
if not os.path.exists(filepath):
flash("Selected file does not exist (any longer)", "alert-danger")
@@ -81,144 +47,83 @@ def parse():
if errors:
return redirect(url_for("entry.index"))
- jobs.create_jobs_table(retrieve_dbpath())
- filetype = (
- FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
- job = jobs.enqueue_job(
- "qc_app.parse.queued_parse", filepath, filetype,
- os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"],
- retrieve_dbpath())
- try:
- dbconn = sqlite3.connect(retrieve_dbpath())
- jobs.update_meta(
- dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size,
- status="enqueued", progress=0)
- except Exception as exc:
- import traceback
- print(traceback.format_exc())
- dbconn.rollback()
- finally:
- dbconn.close()
-
- return redirect(url_for("parse.parse_status", job_id=job.get_id()))
+ with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+ job = jobs.launch_job(
+ rconn, filepath, filetype, app.config["REDIS_URL"],
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+
+ return redirect(url_for("parse.parse_status", job_id=job["job_id"]))
@parsebp.route("/status/<job_id>", methods=["GET"])
def parse_status(job_id: str):
- try:
- dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10)
- job = jobs.job(job_id)
- if job:
- job_id = job.get_id()
- job_meta = jobs.retrieve_meta(dbconn, job_id)
- progress = job_meta["progress"]
- status = job_meta["status"]
- filename = job_meta.get("filename", "uploaded file")
- if status == "success":
- return redirect(url_for("parse.results", job_id=job_id))
-
- if status == "parse-error":
+ with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+ job = jobs.job(rconn, job_id)
+
+ if job:
+ error_filename = jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ if os.path.exists(error_filename):
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
return redirect(url_for("parse.fail", job_id=job_id))
- return render_template(
- "job_progress.html",
- job_id = job_id,
- job_status = status,
- progress = progress,
- message = job_meta.get("message", ""),
- job_name = f"Parsing '{filename}'")
-
- return render_template("no_such_job.html", job_id=job_id)
- except sqlite3.OperationalError:
- return redirect(url_for("parse.parse_status", job_id=job_id))
- except Exception as exc:
- import traceback
- print(traceback.format_exc())
- return exc
- finally:
- dbconn.close()
+ print(f"THE JOB {job}")
+ job_id = job["job_id"]
+ progress = float(job["percent"])
+ status = job["status"]
+ filename = job.get("filename", "uploaded file")
+ if status == "success":
+ return redirect(url_for("parse.results", job_id=job_id))
+
+ if status == "parse-error":
+ return redirect(url_for("parse.fail", job_id=job_id))
+
+ return render_template(
+ "job_progress.html",
+ job_id = job_id,
+ job_status = status,
+ progress = progress,
+ message = job.get("message", ""),
+ job_name = f"Parsing '{filename}'")
+
+ return render_template("no_such_job.html", job_id=job_id)
@parsebp.route("/results/<job_id>", methods=["GET"])
def results(job_id: str):
"""Show results of parsing..."""
- job = jobs.job(job_id)
+ with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+ job = jobs.job(rconn, job_id)
+
if job:
- try:
- dbconn = sqlite3.connect(retrieve_dbpath())
- job_meta = jobs.retrieve_meta(dbconn, job_id)
- filename = job_meta["filename"]
- errors = job_meta.get("errors", [])
- return render_template(
- "parse_results.html",
- errors=errors,
- job_name = f"Parsing '{filename}'",
- starting_line_number=job_meta.get("starting_line_number", 0))
- except Exception as exc:
- import traceback
- print(traceback.format_exc())
- finally:
- dbconn.close()
+ filename = job["filename"]
+ errors = jsonpickle.decode(job["errors"])
+ return render_template(
+ "parse_results.html",
+ errors=errors,
+ job_name = f"Parsing '{filename}'",
+ isinvalidvalue=lambda item: isinstance(item, InvalidValue))
return render_template("no_such_job.html", job_id=job_id)
-def queued_collect_errors(
- filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
- dbpath: str, seek_pos: int = 0):
- job = get_current_job()
- job_id = job.get_id()
- errors = []
- count = 0
- with Redis.from_url(redis_url) as rconn:
- try:
- dbconn = sqlite3.connect(dbpath)
- job_meta = jobs.retrieve_meta(dbconn, job.get_id())
- for error in parse_errors(
- filepath, filetype, strain_names(strainsfile),
- seek_pos):
- count = count + 1
- progress = ((error["position"] / job_meta["filesize"]) * 100)
- job_meta = jobs.update_meta(
- dbconn, job_id, message = f"Collected {count} errors",
- progress = progress)
- errors.append(error)
-
- job_meta = jobs.update_meta(
- dbconn, job_id, errors = errors, progress = 100, status = "success")
- os.remove(filepath)
- except Exception as exc:
- dbconn.rollback()
- finally:
- dbconn.close()
-
@parsebp.route("/fail/<job_id>", methods=["GET"])
def fail(job_id: str):
"""Handle parsing failure"""
- try:
- dbpath = retrieve_dbpath()
- dbconn = sqlite3.connect(dbpath)
- old_job = jobs.job(job_id)
- if old_job:
- job_id = old_job.get_id()
- old_meta = jobs.retrieve_meta(dbconn, job_id)
- progress = old_meta["progress"]
- status = old_meta["status"]
- results = old_meta["results"]
- filename = old_meta["filename"]
-
- new_job = jobs.enqueue_job(
- "qc_app.parse.queued_collect_errors",
- os.path.join(
- app.config["UPLOAD_FOLDER"], old_meta["filename"]),
- results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"),
- app.config["REDIS_URL"], dbpath, results["position"])
- jobs.update_meta(
- dbconn, new_job.get_id(), status = "Collecting Errors",
- filename = old_meta["filename"], filesize = old_meta["filesize"],
- starting_line_number = results["line_number"],
- progress = progress)
- return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
-
- return render_template("no_such_job.html", job_id=job_id)
- except Exception as exc:
- dbconn.rollback()
- finally:
- dbconn.close()
+ with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+ old_job = jobs.job(rconn, job_id)
+
+ if old_job:
+ error_filename = jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ if os.path.exists(error_filename):
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ with open(error_filename) as errorfile:
+ return errorfile.read()
+
+ # job_id = old_job["job_id"]
+ progress = old_job["progress"]
+ status = old_job["status"]
+ results = old_job["results"]
+ filename = old_job["filename"]
+
+ return render_template("no_such_job.html", job_id=job_id)