about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-19 15:26:15 +0300
committerFrederick Muriuki Muriithi2022-05-19 15:30:20 +0300
commit2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c (patch)
tree9e0fdcc010925a9f6d5674d41991b6185aea54e3
parent27f6e9e28f2a3244bdd00336cf918de97b2ceed6 (diff)
downloadgn-uploader-2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c.tar.gz
Update Web-UI: use new error collection paradigm
- README.org: document how to run scripts manually
- manifest.scm: remove python-rq as a dependency
- qc_app/jobs.py: rework job launching and processing
- qc_app/parse.py: use reworked job processing
- qc_app/templates/job_progress.html: display progress correctly
- qc_app/templates/parse_results.html: display final results
- scripts/worker.py: new worker script
-rw-r--r--README.org22
-rw-r--r--manifest.scm1
-rw-r--r--qc_app/jobs.py96
-rw-r--r--qc_app/parse.py237
-rw-r--r--qc_app/templates/job_progress.html2
-rw-r--r--qc_app/templates/parse_results.html48
-rw-r--r--scripts/worker.py88
7 files changed, 233 insertions, 261 deletions
diff --git a/README.org b/README.org
index 390320a..55e70df 100644
--- a/README.org
+++ b/README.org
@@ -41,28 +41,10 @@ application.
 
 *** Run the CLI version
 
-To run the CLI version of the application, we need to set up the correct ~PYTHONPATH~
+Run the CLI version of the application, with
 #+BEGIN_SRC shell
-export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$(pwd)"
+python3 -m scripts.qc --help
 #+END_SRC
-which enables us to run the script with
-#+BEGIN_SRC shell
-python3 scripts/qc.py --help
-#+END_SRC
-
-Without setting up the ~PYTHONPATH~ environment variable, you might get an error
-like the following
-#+BEGIN_EXAMPLE shell
-$ python3 scripts/qc.py --help
-Traceback (most recent call last):
-  File "/home/frederick/genenetwork/gnqc_py/scripts/qc.py", line 8, in <module>
-    from quality_control.errors import ParseError
-ModuleNotFoundError: No module named 'quality_control'
-#+END_EXAMPLE
-
-*NOTE*: Setting up the ~PYTHONPATH~ environment variable is only necessary for
-the development environment. It *MUST NOT* require that the end-user set this up
-at all!
 
 *** Run the web version
 
diff --git a/manifest.scm b/manifest.scm
index 2062b07..42d95bb 100644
--- a/manifest.scm
+++ b/manifest.scm
@@ -1,6 +1,5 @@
 (specifications->manifest
  (list "python"
-       "python-rq"
        "python-mypy"
        "python-redis"
        "python-flask"
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index afea419..4d3dba6 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,82 +1,32 @@
-import jsonpickle
-import sqlite3
+import os
+import shlex
+import subprocess
+from uuid import uuid4
 
-from rq import Queue
-from rq.job import Job
 from redis import Redis
-from flask import current_app as app
 
-def enqueue_job(delayed_fn: str, *args, **kwargs):
-    """Add job to queue"""
-    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
-        job = Job.create(
-            delayed_fn, args, **{
-                key: val for key, val in kwargs.items()
-                if key != "additional_jobs_meta"
-            },
-            connection = rconn,
-            timeout = "2h",
-            result_ttl = "5h",
-            failure_ttl = "5h"
-        )
+def error_filename(job_id, error_dir):
+    return f"{error_dir}/job_{job_id}.error"
 
-        queue = Queue("qcapp_queue", connection=rconn)
-        queue.enqueue_job(job)
+def launch_job(redis_conn: Redis, filepath, filetype, redisurl, error_dir):
+    """Launch a job in the background"""
+    job_id = str(uuid4())
+    command = [
+        "python3", "-m" "scripts.worker", filetype, filepath, redisurl, job_id]
+    job = {
+        "job_id": job_id, "command": shlex.join(command), "status": "pending",
+        "filename": os.path.basename(filepath), "percent": 0
+    }
+    redis_conn.hset(name=job["job_id"], mapping=job)
 
-    return job
+    if not os.path.exists(error_dir):
+        os.mkdir(error_dir)
 
-def job(job_id: str):
-    "Retrieve the job"
-    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
-        queue = Queue("qcapp_queue", connection=rconn)
-        job = queue.fetch_job(job_id)
-        job.refresh()
+    with open(error_filename(job_id, error_dir), "w") as errorfile:
+        subprocess.Popen(command, stderr=errorfile)
 
     return job
 
-def create_jobs_table(dbpath: str):
-    """Create sqlite3 table to track job progress"""
-    conn = sqlite3.connect(dbpath)
-    cursor = conn.cursor()
-    cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
-    cursor.close()
-    conn.close()
-
-def retrieve_meta(conn, job_id: str):
-    """Retrieve the job's metadata."""
-    meta = {}
-    try:
-        cursor = conn.cursor()
-        cursor.execute(
-            "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
-        db_row = cursor.fetchone()
-        # meta = json.loads(db_row [1]) if db_row else {}
-        meta = jsonpickle.decode(db_row [1]) if db_row else {}
-    except Exception as exc:
-        cursor.close()
-        raise exc
-    finally:
-        cursor.close()
-
-    return meta
-
-def update_meta(conn, job_id: str, **meta):
-    """Update the job's metadata."""
-    try:
-        cursor = conn.cursor()
-        old_meta = retrieve_meta(conn, job_id)
-        meta = {**old_meta, **meta}
-        query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
-        if not bool(old_meta):
-            query = "INSERT INTO jobs VALUES (:job_id, :meta)"
-
-        cursor.execute(
-            query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
-        conn.commit()
-    except Exception as exc:
-        cursor.close()
-        raise exc
-    finally:
-        cursor.close()
-
-    return meta
+def job(redis_conn, job_id: str):
+    "Retrieve the job"
+    return redis_conn.hgetall(job_id)
diff --git a/qc_app/parse.py b/qc_app/parse.py
index baad9a6..d2fa030 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -1,10 +1,9 @@
 """File parsing module"""
 import os
-import sqlite3
 from functools import reduce
 
+import jsonpickle
 from redis import Redis
-from rq import get_current_job
 from flask import (
     request,
     url_for,
@@ -14,50 +13,13 @@ from flask import (
     current_app as app)
 
 from . import jobs
-from quality_control.errors import ParseError
+from quality_control.errors import InvalidValue
 from quality_control.parsing import (
     FileType,
-    parse_file,
-    strain_names,
-    parse_errors)
+    strain_names)
 
 parsebp = Blueprint("parse", __name__)
 
-def queued_parse(
-        filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
-        dbpath: str):
-    job = get_current_job()
-    job_id = job.get_id()
-    with Redis.from_url(redis_url) as rconn:
-        dbconn = sqlite3.connect(dbpath)
-        try:
-            job_meta = jobs.update_meta(
-                dbconn, job_id, status = "in-progress", progress = 0)
-            parsed = parse_file(filepath, filetype, strain_names(strainsfile))
-            for line, curr_size in parsed:
-                job_meta = jobs.update_meta(
-                    dbconn, job_id,
-                    progress=((curr_size/job_meta["filesize"]) * 100),
-                    message = f"Parsed {curr_size} bytes")
-
-            os.remove(filepath)
-            jobs_meta = jobs.update_meta(
-                dbconn, job_id, progress = 100, status = "success",
-                message = "no errors found")
-        except ParseError as pe:
-            pe_dict = pe.args[0]
-            job_meta = jobs.update_meta(
-                dbconn, job_id, status = "parse-error", results = {
-                    "filename": os.path.basename(filepath), "filetype": filetype,
-                    "position": pe_dict["position"],
-                    "line_number": pe_dict["line_number"]
-                })
-        finally:
-            dbconn.close()
-
-def retrieve_dbpath():
-    return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db")
-
 @parsebp.route("/parse", methods=["GET"])
 def parse():
     """Trigger file parsing"""
@@ -73,6 +35,10 @@ def parse():
         flash("No filetype provided", "alert-error")
         errors = True
 
+    if filetype not in ("average", "standard-error"):
+        flash("Invalid filetype provided", "alert-error")
+        errors = True
+
     filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename)
     if not os.path.exists(filepath):
         flash("Selected file does not exist (any longer)", "alert-danger")
@@ -81,144 +47,83 @@ def parse():
     if errors:
         return redirect(url_for("entry.index"))
 
-    jobs.create_jobs_table(retrieve_dbpath())
-    filetype = (
-        FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
-    job = jobs.enqueue_job(
-        "qc_app.parse.queued_parse", filepath, filetype,
-        os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"],
-        retrieve_dbpath())
-    try:
-        dbconn = sqlite3.connect(retrieve_dbpath())
-        jobs.update_meta(
-            dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size,
-            status="enqueued", progress=0)
-    except Exception as exc:
-        import traceback
-        print(traceback.format_exc())
-        dbconn.rollback()
-    finally:
-        dbconn.close()
-
-    return redirect(url_for("parse.parse_status", job_id=job.get_id()))
+    with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+        job = jobs.launch_job(
+            rconn, filepath, filetype, app.config["REDIS_URL"],
+        f"{app.config['UPLOAD_FOLDER']}/job_errors")
+
+    return redirect(url_for("parse.parse_status", job_id=job["job_id"]))
 
 @parsebp.route("/status/<job_id>", methods=["GET"])
 def parse_status(job_id: str):
-    try:
-        dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10)
-        job = jobs.job(job_id)
-        if job:
-            job_id = job.get_id()
-            job_meta = jobs.retrieve_meta(dbconn, job_id)
-            progress = job_meta["progress"]
-            status = job_meta["status"]
-            filename = job_meta.get("filename", "uploaded file")
-            if status == "success":
-                return redirect(url_for("parse.results", job_id=job_id))
-
-            if status == "parse-error":
+    with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+        job = jobs.job(rconn, job_id)
+
+    if job:
+        error_filename = jobs.error_filename(
+            job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+        if os.path.exists(error_filename):
+            stat = os.stat(error_filename)
+            if stat.st_size > 0:
                 return redirect(url_for("parse.fail", job_id=job_id))
 
-            return render_template(
-                "job_progress.html",
-                job_id = job_id,
-                job_status = status,
-                progress = progress,
-                message = job_meta.get("message", ""),
-                job_name = f"Parsing '{filename}'")
-
-        return render_template("no_such_job.html", job_id=job_id)
-    except sqlite3.OperationalError:
-        return redirect(url_for("parse.parse_status", job_id=job_id))
-    except Exception as exc:
-        import traceback
-        print(traceback.format_exc())
-        return exc
-    finally:
-        dbconn.close()
+        print(f"THE JOB {job}")
+        job_id = job["job_id"]
+        progress = float(job["percent"])
+        status = job["status"]
+        filename = job.get("filename", "uploaded file")
+        if status == "success":
+            return redirect(url_for("parse.results", job_id=job_id))
+
+        if status == "parse-error":
+            return redirect(url_for("parse.fail", job_id=job_id))
+
+        return render_template(
+            "job_progress.html",
+            job_id = job_id,
+            job_status = status,
+            progress = progress,
+            message = job.get("message", ""),
+            job_name = f"Parsing '{filename}'")
+
+    return render_template("no_such_job.html", job_id=job_id)
 
 @parsebp.route("/results/<job_id>", methods=["GET"])
 def results(job_id: str):
     """Show results of parsing..."""
-    job = jobs.job(job_id)
+    with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+        job = jobs.job(rconn, job_id)
+
     if job:
-        try:
-            dbconn = sqlite3.connect(retrieve_dbpath())
-            job_meta = jobs.retrieve_meta(dbconn, job_id)
-            filename = job_meta["filename"]
-            errors = job_meta.get("errors", [])
-            return render_template(
-                "parse_results.html",
-                errors=errors,
-                job_name = f"Parsing '{filename}'",
-                starting_line_number=job_meta.get("starting_line_number", 0))
-        except Exception as exc:
-            import traceback
-            print(traceback.format_exc())
-        finally:
-            dbconn.close()
+        filename = job["filename"]
+        errors = jsonpickle.decode(job["errors"])
+        return render_template(
+            "parse_results.html",
+            errors=errors,
+            job_name = f"Parsing '{filename}'",
+            isinvalidvalue=lambda item: isinstance(item, InvalidValue))
 
     return render_template("no_such_job.html", job_id=job_id)
 
-def queued_collect_errors(
-        filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
-        dbpath: str, seek_pos: int = 0):
-    job = get_current_job()
-    job_id = job.get_id()
-    errors = []
-    count = 0
-    with Redis.from_url(redis_url) as rconn:
-        try:
-            dbconn = sqlite3.connect(dbpath)
-            job_meta = jobs.retrieve_meta(dbconn, job.get_id())
-            for error in parse_errors(
-                    filepath, filetype, strain_names(strainsfile),
-                    seek_pos):
-                count = count + 1
-                progress  = ((error["position"] / job_meta["filesize"]) * 100)
-                job_meta = jobs.update_meta(
-                    dbconn, job_id, message = f"Collected {count} errors",
-                    progress = progress)
-                errors.append(error)
-
-            job_meta = jobs.update_meta(
-                dbconn, job_id, errors = errors, progress = 100, status = "success")
-            os.remove(filepath)
-        except Exception as exc:
-            dbconn.rollback()
-        finally:
-            dbconn.close()
-
 @parsebp.route("/fail/<job_id>", methods=["GET"])
 def fail(job_id: str):
     """Handle parsing failure"""
-    try:
-        dbpath = retrieve_dbpath()
-        dbconn = sqlite3.connect(dbpath)
-        old_job = jobs.job(job_id)
-        if old_job:
-            job_id = old_job.get_id()
-            old_meta = jobs.retrieve_meta(dbconn, job_id)
-            progress = old_meta["progress"]
-            status = old_meta["status"]
-            results = old_meta["results"]
-            filename = old_meta["filename"]
-
-            new_job = jobs.enqueue_job(
-                "qc_app.parse.queued_collect_errors",
-                os.path.join(
-                    app.config["UPLOAD_FOLDER"], old_meta["filename"]),
-                results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"),
-                app.config["REDIS_URL"], dbpath, results["position"])
-            jobs.update_meta(
-                dbconn, new_job.get_id(), status =  "Collecting Errors",
-                filename = old_meta["filename"], filesize = old_meta["filesize"],
-                starting_line_number = results["line_number"],
-                progress = progress)
-            return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
-
-        return render_template("no_such_job.html", job_id=job_id)
-    except Exception as exc:
-        dbconn.rollback()
-    finally:
-        dbconn.close()
+    with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+        old_job = jobs.job(rconn, job_id)
+
+    if old_job:
+        error_filename = jobs.error_filename(
+            job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+        if os.path.exists(error_filename):
+            stat = os.stat(error_filename)
+            if stat.st_size > 0:
+                with open(error_filename) as errorfile:
+                    return errorfile.read()
+
+        # job_id = old_job["job_id"]
+        progress = old_job["progress"]
+        status = old_job["status"]
+        results = old_job["results"]
+        filename = old_job["filename"]
+
+    return render_template("no_such_job.html", job_id=job_id)
diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html
index 26dfe6f..8a9256f 100644
--- a/qc_app/templates/job_progress.html
+++ b/qc_app/templates/job_progress.html
@@ -13,7 +13,7 @@
 <span>{{job_status}}: {{message}}</span><br />
 
 <label for="job_{{job_id}}">parsing: </label>
-<progress id="job_{{job_id}}" value="{{progress}}">{{progress}}</progress>
+<progress id="job_{{job_id}}" value="{{progress/100}}">{{progress}}</progress>
 <span>{{"%.2f" | format(progress)}}%</span>
 
 {%endblock%}
diff --git a/qc_app/templates/parse_results.html b/qc_app/templates/parse_results.html
new file mode 100644
index 0000000..a750bb5
--- /dev/null
+++ b/qc_app/templates/parse_results.html
@@ -0,0 +1,48 @@
+{%extends "base.html"%}
+
+{%block title%}Parse Results{%endblock%}
+
+{%block contents%}
+<h1 class="heading">{{job_name}}: parse results</h2>
+
+{%if errors | length == 0 %}
+<span class="alert-success">No errors found in the file</span>
+{%else %}
+<p class="alert-error">We found the following errors</p>
+
+<table class="reports-table">
+  <thead>
+    <tr>
+      <th>line number</th>
+      <th>column(s)</th>
+      <th>error</th>
+      <th>error message</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    {%for error in errors%}
+    <tr>
+      <td>{{error["line"]}}</td>
+      <td>
+	{%if isinvalidvalue(error):%}
+	{{error.column}}
+	{%else: %}
+	{{error.columns}}
+	{%endif %}
+      </td>
+      <td>
+	{%if isinvalidvalue(error):%}
+	Invalid Value
+	{%else: %}
+	Duplicate Header
+	{%endif %}
+      </td>
+      <td>{{error["message"]}}</td>
+    </tr>
+    {%endfor%}
+  </tbody>
+</table>
+{%endif%}
+
+{%endblock%}
diff --git a/scripts/worker.py b/scripts/worker.py
new file mode 100644
index 0000000..f4d5b6b
--- /dev/null
+++ b/scripts/worker.py
@@ -0,0 +1,88 @@
+import os
+import sys
+from typing import Callable
+
+import jsonpickle
+from redis import Redis
+from redis.exceptions import ConnectionError
+
+from .qc import cli_argument_parser
+from quality_control.utils import make_progress_calculator
+from quality_control.parsing import (
+    take, FileType, strain_names, collect_errors)
+
+
+def make_progress_indicator(
+        redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable:
+    def __indicator__(linenumber, linetext):
+        progress = progress_calc_fn(linenumber, linetext)
+        redis_connection.hset(name=job_id, mapping=progress._asdict())
+
+        return progress
+
+    return __indicator__
+
+def cli_args_valid(args):
+    if not os.path.exists(args.filepath):
+        print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
+        return None
+
+    if not os.path.exists(args.strainsfile):
+        print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr)
+        return None
+
+    try:
+        conn = Redis.from_url(args.redisurl)
+    except ConnectionError as ce:
+        print(traceback.format_exc(), file=sys.stderr)
+        return None
+
+    return args
+
+def process_cli_arguments():
+    parser = cli_argument_parser()
+    parser.prog = "worker"
+    parser.add_argument(
+        "redisurl", default="redis:///", help="URL to the redis server")
+    parser.add_argument("job_id", help="The id of the job being processed")
+
+    return cli_args_valid(parser.parse_args())
+
+def main():
+    args = process_cli_arguments()
+    if args is None:
+        print("Quiting due to errors!", file=sys.stderr)
+        return 1
+
+    with Redis.from_url(args.redisurl) as redis_conn:
+        progress_calculator = make_progress_calculator(
+            os.stat(args.filepath).st_size)
+        progress_indicator = make_progress_indicator(
+            redis_conn, args.job_id, progress_calculator)
+        count = args.count
+        filepath = args.filepath
+        filetype = (
+            FileType.AVERAGE if args.filetype == "average"
+            else FileType.STANDARD_ERROR)
+        strains = strain_names(args.strainsfile)
+
+        redis_conn.hset(
+            name=args.job_id, key="status", value="Processing")
+        redis_conn.hset(
+            name=args.job_id, key="message", value="Collecting errors")
+
+        if count > 0:
+            errors = take(
+                collect_errors(filepath, filetype, strains, progress_indicator),
+                count)
+        else:
+            errors = collect_errors(filepath, filetype, strains, progress_indicator)
+
+        redis_conn.hset(
+            name=args.job_id, key="errors", value=jsonpickle.encode(errors))
+        redis_conn.hset(name=args.job_id, key="status", value="success")
+
+    return 0
+
+if __name__ == "__main__":
+    main()