aboutsummaryrefslogtreecommitdiff
path: root/qc_app
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-26 15:18:26 +0300
committerFrederick Muriuki Muriithi2022-04-26 15:18:26 +0300
commitea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d (patch)
treef4dc7e29b8e49a1cbf2412d0bcab55294957e021 /qc_app
parente6895f5bac672d2e1d2a04fe8118fa55c3a40b91 (diff)
downloadgn-uploader-ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d.tar.gz
Fix issues caught processing the jobs
* Create and push the application context for the worker functions * Fix the update of meta fields
Diffstat (limited to 'qc_app')
-rw-r--r--qc_app/__init__.py10
-rw-r--r--qc_app/jobs.py33
-rw-r--r--qc_app/parse.py90
-rw-r--r--qc_app/templates/job_progress.html2
4 files changed, 107 insertions, 28 deletions
diff --git a/qc_app/__init__.py b/qc_app/__init__.py
index 35cc422..7f423c2 100644
--- a/qc_app/__init__.py
+++ b/qc_app/__init__.py
@@ -7,6 +7,16 @@ from flask import Flask
from .entry import entrybp
from .parse import parsebp
+def instance_path():
+ """Retrieve the `instance_path`. Raise an exception if not defined."""
+ path = os.getenv("QCAPP_INSTANCE_PATH", None)
+ if path is None:
+ raise Exception((
+ "Configuration Error: Set the `QCAPP_INSTANCE_PATH` environment "
+ "variable."))
+
+ return path
+
def create_app(instance_path):
"""The application factory"""
app = Flask(
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dbeb9ce..908c244 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,19 +1,46 @@
from rq import Queue
+from rq.job import Job
from redis import Redis
from flask import current_app as app
def enqueue_job(delayed_fn, *args, **kwargs):
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+ job = Job.create(
+ delayed_fn, args, **{
+ key: val for key, val in kwargs.items()
+ if key != "additional_jobs_meta"
+ },
+ connection = rconn)
+ job.meta["status"] = "enqueued"
+ job.meta["progress"] = 0
+ if "additional_jobs_meta" in kwargs:
+ for key, val in kwargs["additional_jobs_meta"].items():
+ job.meta[key] = val
+
+ job.save_meta()
+
queue = Queue("qcapp_queue", connection=rconn)
- job = queue.enqueue(delayed_fn, *args, **kwargs)
+ queue.enqueue_job(job)
- job.meta["status"] = "enqueued"
- job.save_meta()
return job
def job(job_id):
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
queue = Queue("qcapp_queue", connection=rconn)
job = queue.fetch_job(job_id)
+ job.refresh()
+
+ return job
+
+def update_meta(stale_job, **kwargs):
+ with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+ queue = Queue("qcapp_queue", connection=rconn)
+ job = queue.fetch_job(stale_job.get_id())
+ job.refresh()
+ meta_dict = {**stale_job.meta, **job.meta, **kwargs}
+ for key, val in meta_dict.items():
+ job.meta[key] = val
+
+ job.save_meta()
return job
diff --git a/qc_app/parse.py b/qc_app/parse.py
index 1ebe637..ffdd123 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -16,37 +16,37 @@ from quality_control.parsing import (
FileType,
parse_file,
strain_names,
+ parse_errors,
parse_strains)
parsebp = Blueprint("parse", __name__)
def queued_parse(filepath, filetype):
+ from qc_app import create_app, instance_path
+ app = create_app(instance_path())
+ app.app_context().push()
+
job = get_current_job()
- job.meta["filename"] = os.path.basename(filepath)
- job.meta["status"] = "in-progress"
+ jobs.update_meta(job, status = "in-progress", progress = 0)
job.save_meta()
- filesize = os.stat(filepath).st_size
try:
parsed = parse_file(
filepath, filetype, strain_names(parse_strains("strains.csv")))
for line, curr_size in parsed:
- job.meta["progress"] = (curr_size/filesize) * 100
- job.meta["status"] = f"Parsed {curr_size} bytes"
- job.save_meta()
+ jobs.update_meta(
+ job, progress = (curr_size/job.meta["filesize"]) * 100,
+ message = f"Parsed {curr_size} bytes")
os.remove(filepath)
- job.meta["progress"] = 100
- job.meta["status"] = "success"
- job.meta["results"] = {"message": "no errors found"}
- job.save_meta()
+ jobs.update_meta(
+ job, progress = 10, status = "success", message = "no errors found")
except ParseError as pe:
pe_dict = pe.args[0]
- job.meta["status"] = "parse-error"
- job.meta["results"] = {
- "filename": filename, "filetype": filetype,
- "position": pe_dict["position"]
- }
- job.save_meta()
+ jobs.update_meta(
+ job, status = "parse-error", results = {
+ "filename": os.path.basename(filepath), "filetype": filetype,
+ "position": pe_dict["position"]
+ })
@parsebp.route("/parse", methods=["GET"])
def parse():
@@ -73,9 +73,10 @@ def parse():
filetype = (
FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
- job = jobs.enqueue_job("qc_app.parse.queued_parse", filepath, filetype)
- job.meta["filename"] = filename
- job.save_meta()
+ job = jobs.enqueue_job(
+ "qc_app.parse.queued_parse", filepath, filetype,
+ additional_jobs_meta = {
+ "filename": filename, "filesize": os.stat(filepath).st_size})
return redirect(url_for("parse.parse_status", job_id=job.get_id()))
@parsebp.route("/status/<job_id>", methods=["GET"])
@@ -83,7 +84,7 @@ def parse_status(job_id):
job = jobs.job(job_id)
if job:
job_id = job.get_id()
- progress = job.meta.get("progress", 0)
+ progress = job.meta["progress"]
status = job.meta["status"]
filename = job.meta.get("filename", "uploaded file")
if status == "success":
@@ -97,6 +98,7 @@ def parse_status(job_id):
job_id = job_id,
job_status = status,
progress = progress,
+ message = job.meta.get("message", ""),
job_name = f"Parsing '{filename}'")
return render_template("no_such_job.html", job_id=job_id)
@@ -106,7 +108,47 @@ def results(job_id):
"""Indicates success if parsing the file is successful"""
return "STUB: Parse success!!!"
-@parsebp.route("/fail", methods=["GET"])
-def fail():
- """Indicates success if parsing the file is successful"""
- return "STUB: Parse Failure!!!"
+def queued_collect_errors(filepath, filetype, seek_pos=0):
+ from qc_app import create_app, instance_path
+ app = create_app(instance_path())
+ app.app_context().push()
+
+ job = get_current_job()
+ errors = []
+ count = 0
+
+ for error in parse_errors(
+ filepath, filetype, strain_names(parse_strains("strains.csv")),
+ seek_pos):
+ count = count + 1
+ jobs.update_meta(
+ job, message = f"Collected {count} errors", progress = (
+ (error["position"] / job.meta["filesize"]) * 100))
+ errors.append(error)
+
+ jobs.update_meta(job, errors = errors, progress = 100, status = "success")
+
+@parsebp.route("/fail/<job_id>", methods=["GET"])
+def fail(job_id):
+ """Handle parsing failure"""
+ old_job = jobs.job(job_id)
+ if old_job:
+ old_job.refresh()
+ job_id = old_job.get_id()
+ progress = old_job.meta.get("progress", 0)
+ status = old_job.meta["status"]
+ results = old_job.meta["results"]
+ filename = old_job.meta.get("filename", "uploaded file")
+
+ new_job = jobs.enqueue_job(
+ "qc_app.parse.queued_collect_errors",
+ os.path.join(
+ app.config["UPLOAD_FOLDER"], old_job.meta["filename"]),
+ results["filetype"], results["position"],
+ additional_jobs_meta = {
+ "status": "Collecting Errors",
+ "filename": old_job.meta["filename"],
+ "filesize": old_job.meta["filesize"]})
+ return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
+
+ return render_template("no_such_job.html", job_id=job_id)
diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html
index 1c6aa32..26dfe6f 100644
--- a/qc_app/templates/job_progress.html
+++ b/qc_app/templates/job_progress.html
@@ -10,7 +10,7 @@
<h1 class="heading">{{job_name}}</h2>
<label for="job_status">status:</label>
-<span>{{job_status}}</span><br />
+<span>{{job_status}}: {{message}}</span><br />
<label for="job_{{job_id}}">parsing: </label>
<progress id="job_{{job_id}}" value="{{progress}}">{{progress}}</progress>