about summary refs log tree commit diff
path: root/qc_app
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-26 15:18:26 +0300
committerFrederick Muriuki Muriithi2022-04-26 15:18:26 +0300
commitea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d (patch)
treef4dc7e29b8e49a1cbf2412d0bcab55294957e021 /qc_app
parente6895f5bac672d2e1d2a04fe8118fa55c3a40b91 (diff)
downloadgn-uploader-ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d.tar.gz
Fix issues caught processing the jobs
* Create and push the application context for the worker functions
* Fix the update of meta fields
Diffstat (limited to 'qc_app')
-rw-r--r--qc_app/__init__.py10
-rw-r--r--qc_app/jobs.py33
-rw-r--r--qc_app/parse.py90
-rw-r--r--qc_app/templates/job_progress.html2
4 files changed, 107 insertions, 28 deletions
diff --git a/qc_app/__init__.py b/qc_app/__init__.py
index 35cc422..7f423c2 100644
--- a/qc_app/__init__.py
+++ b/qc_app/__init__.py
@@ -7,6 +7,16 @@ from flask import Flask
 from .entry import entrybp
 from .parse import parsebp
 
+def instance_path():
+    """Retrieve the `instance_path`. Raise an exception if not defined."""
+    path = os.getenv("QCAPP_INSTANCE_PATH", None)
+    if path is None:
+        raise Exception((
+            "Configuration Error: Set the `QCAPP_INSTANCE_PATH` environment "
+            "variable."))
+
+    return path
+
 def create_app(instance_path):
     """The application factory"""
     app = Flask(
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dbeb9ce..908c244 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,19 +1,46 @@
 from rq import Queue
+from rq.job import Job
 from redis import Redis
 from flask import current_app as app
 
 def enqueue_job(delayed_fn, *args, **kwargs):
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+        job = Job.create(
+            delayed_fn, args, **{
+                key: val for key, val in kwargs.items()
+                if key != "additional_jobs_meta"
+            },
+            connection = rconn)
+        job.meta["status"] = "enqueued"
+        job.meta["progress"] = 0
+        if "additional_jobs_meta" in kwargs:
+            for key, val in kwargs["additional_jobs_meta"].items():
+                job.meta[key] = val
+
+        job.save_meta()
+
         queue = Queue("qcapp_queue", connection=rconn)
-        job =  queue.enqueue(delayed_fn, *args, **kwargs)
+        queue.enqueue_job(job)
 
-    job.meta["status"] = "enqueued"
-    job.save_meta()
     return job
 
 def job(job_id):
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
         queue = Queue("qcapp_queue", connection=rconn)
         job = queue.fetch_job(job_id)
+        job.refresh()
+
+    return job
+
+def update_meta(stale_job, **kwargs):
+    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+        queue = Queue("qcapp_queue", connection=rconn)
+        job = queue.fetch_job(stale_job.get_id())
+        job.refresh()
+        meta_dict = {**stale_job.meta, **job.meta, **kwargs}
+        for key, val in meta_dict.items():
+            job.meta[key] = val
+
+        job.save_meta()
 
     return job
diff --git a/qc_app/parse.py b/qc_app/parse.py
index 1ebe637..ffdd123 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -16,37 +16,37 @@ from quality_control.parsing import (
     FileType,
     parse_file,
     strain_names,
+    parse_errors,
     parse_strains)
 
 parsebp = Blueprint("parse", __name__)
 
 def queued_parse(filepath, filetype):
+    from qc_app import create_app, instance_path
+    app = create_app(instance_path())
+    app.app_context().push()
+
     job = get_current_job()
-    job.meta["filename"] = os.path.basename(filepath)
-    job.meta["status"] = "in-progress"
+    jobs.update_meta(job, status = "in-progress", progress = 0)
     job.save_meta()
-    filesize = os.stat(filepath).st_size
     try:
         parsed = parse_file(
             filepath, filetype, strain_names(parse_strains("strains.csv")))
         for line, curr_size in parsed:
-            job.meta["progress"] = (curr_size/filesize) * 100
-            job.meta["status"] = f"Parsed {curr_size} bytes"
-            job.save_meta()
+            jobs.update_meta(
+                job, progress = (curr_size/job.meta["filesize"]) * 100,
+                message = f"Parsed {curr_size} bytes")
 
         os.remove(filepath)
-        job.meta["progress"] = 100
-        job.meta["status"] = "success"
-        job.meta["results"] = {"message": "no errors found"}
-        job.save_meta()
+        jobs.update_meta(
+            job, progress = 10, status = "success", message = "no errors found")
     except ParseError as pe:
         pe_dict = pe.args[0]
-        job.meta["status"] = "parse-error"
-        job.meta["results"] = {
-            "filename": filename, "filetype": filetype,
-            "position": pe_dict["position"]
-        }
-        job.save_meta()
+        jobs.update_meta(
+            job, status = "parse-error", results = {
+                "filename": os.path.basename(filepath), "filetype": filetype,
+                "position": pe_dict["position"]
+            })
 
 @parsebp.route("/parse", methods=["GET"])
 def parse():
@@ -73,9 +73,10 @@ def parse():
 
     filetype = (
         FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
-    job = jobs.enqueue_job("qc_app.parse.queued_parse", filepath, filetype)
-    job.meta["filename"] = filename
-    job.save_meta()
+    job = jobs.enqueue_job(
+        "qc_app.parse.queued_parse", filepath, filetype,
+        additional_jobs_meta = {
+            "filename": filename, "filesize": os.stat(filepath).st_size})
     return redirect(url_for("parse.parse_status", job_id=job.get_id()))
 
 @parsebp.route("/status/<job_id>", methods=["GET"])
@@ -83,7 +84,7 @@ def parse_status(job_id):
     job = jobs.job(job_id)
     if job:
         job_id = job.get_id()
-        progress = job.meta.get("progress", 0)
+        progress = job.meta["progress"]
         status = job.meta["status"]
         filename = job.meta.get("filename", "uploaded file")
         if status == "success":
@@ -97,6 +98,7 @@ def parse_status(job_id):
             job_id = job_id,
             job_status = status,
             progress = progress,
+            message = job.meta.get("message", ""),
             job_name = f"Parsing '{filename}'")
 
     return render_template("no_such_job.html", job_id=job_id)
@@ -106,7 +108,47 @@ def results(job_id):
     """Indicates success if parsing the file is successful"""
     return "STUB: Parse success!!!"
 
-@parsebp.route("/fail", methods=["GET"])
-def fail():
-    """Indicates success if parsing the file is successful"""
-    return "STUB: Parse Failure!!!"
+def queued_collect_errors(filepath, filetype, seek_pos=0):
+    from qc_app import create_app, instance_path
+    app = create_app(instance_path())
+    app.app_context().push()
+
+    job = get_current_job()
+    errors = []
+    count = 0
+
+    for error in parse_errors(
+            filepath, filetype, strain_names(parse_strains("strains.csv")),
+            seek_pos):
+        count = count + 1
+        jobs.update_meta(
+            job, message = f"Collected {count} errors", progress = (
+                (error["position"] / job.meta["filesize"]) * 100))
+        errors.append(error)
+
+    jobs.update_meta(job, errors = errors, progress = 100, status = "success")
+
+@parsebp.route("/fail/<job_id>", methods=["GET"])
+def fail(job_id):
+    """Handle parsing failure"""
+    old_job = jobs.job(job_id)
+    if old_job:
+        old_job.refresh()
+        job_id = old_job.get_id()
+        progress = old_job.meta.get("progress", 0)
+        status = old_job.meta["status"]
+        results = old_job.meta["results"]
+        filename = old_job.meta.get("filename", "uploaded file")
+
+        new_job = jobs.enqueue_job(
+            "qc_app.parse.queued_collect_errors",
+            os.path.join(
+                app.config["UPLOAD_FOLDER"], old_job.meta["filename"]),
+            results["filetype"], results["position"],
+            additional_jobs_meta = {
+                "status": "Collecting Errors",
+                "filename": old_job.meta["filename"],
+                "filesize": old_job.meta["filesize"]})
+        return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
+
+    return render_template("no_such_job.html", job_id=job_id)
diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html
index 1c6aa32..26dfe6f 100644
--- a/qc_app/templates/job_progress.html
+++ b/qc_app/templates/job_progress.html
@@ -10,7 +10,7 @@
 <h1 class="heading">{{job_name}}</h2>
 
 <label for="job_status">status:</label>
-<span>{{job_status}}</span><br />
+<span>{{job_status}}: {{message}}</span><br />
 
 <label for="job_{{job_id}}">parsing: </label>
 <progress id="job_{{job_id}}" value="{{progress}}">{{progress}}</progress>