about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-28 15:32:35 +0300
committerFrederick Muriuki Muriithi2022-04-28 15:32:35 +0300
commit5632dcab27058875de99d63cbd263acfa3a9a2d5 (patch)
tree93cce3204086c14760c3e76497b8b106100b4a96
parentdefc1cf0c1635f3262200a9ba25d8bd0c6fc0a93 (diff)
downloadgn-uploader-5632dcab27058875de99d63cbd263acfa3a9a2d5.tar.gz
Use sqlite3 to save the jobs metadata
* Use sqlite to save the jobs metadata and enable UI update of the
  progress for large files
-rw-r--r--manifest.scm3
-rw-r--r--mypy.ini15
-rw-r--r--qc_app/jobs.py71
-rw-r--r--qc_app/parse.py217
4 files changed, 209 insertions, 97 deletions
diff --git a/manifest.scm b/manifest.scm
index 9ec50a6..2062b07 100644
--- a/manifest.scm
+++ b/manifest.scm
@@ -7,4 +7,5 @@
        "python-magic"
        "python-pylint"
        "python-pytest"
-       "python-hypothesis"))
+       "python-hypothesis"
+       "python-jsonpickle"))
diff --git a/mypy.ini b/mypy.ini
index ba70688..f61431a 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -7,4 +7,19 @@ ignore_missing_imports = True
 ignore_missing_imports = True
 
 [mypy-hypothesis.*]
+ignore_missing_imports = True
+
+[mypy-rq.*]
+ignore_missing_imports = True
+
+[mypy-redis.*]
+ignore_missing_imports = True
+
+[mypy-magic.*]
+ignore_missing_imports = True
+
+[mypy-werkzeug.*]
+ignore_missing_imports = True
+
+[mypy-setuptools.*]
 ignore_missing_imports = True
\ No newline at end of file
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index f613e61..afea419 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -1,9 +1,12 @@
+import jsonpickle
+import sqlite3
+
 from rq import Queue
 from rq.job import Job
 from redis import Redis
 from flask import current_app as app
 
-def enqueue_job(delayed_fn, *args, **kwargs):
+def enqueue_job(delayed_fn: str, *args, **kwargs):
     """Add job to queue"""
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
         job = Job.create(
@@ -11,21 +14,18 @@ def enqueue_job(delayed_fn, *args, **kwargs):
                 key: val for key, val in kwargs.items()
                 if key != "additional_jobs_meta"
             },
-            connection = rconn)
-        job.meta["status"] = "enqueued"
-        job.meta["progress"] = 0
-        if "additional_jobs_meta" in kwargs:
-            for key, val in kwargs["additional_jobs_meta"].items():
-                job.meta[key] = val
-
-        job.save_meta()
+            connection = rconn,
+            timeout = "2h",
+            result_ttl = "5h",
+            failure_ttl = "5h"
+        )
 
         queue = Queue("qcapp_queue", connection=rconn)
         queue.enqueue_job(job)
 
     return job
 
-def job(job_id):
+def job(job_id: str):
     "Retrieve the job"
     with Redis.from_url(app.config["REDIS_URL"]) as rconn:
         queue = Queue("qcapp_queue", connection=rconn)
@@ -34,14 +34,49 @@ def job(job_id):
 
     return job
 
-def update_meta(rconn, stale_job, **kwargs):
+def create_jobs_table(dbpath: str):
+    """Create sqlite3 table to track job progress"""
+    conn = sqlite3.connect(dbpath)
+    cursor = conn.cursor()
+    cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
+    cursor.close()
+    conn.close()
+
+def retrieve_meta(conn, job_id: str):
+    """Retrieve the job's metadata."""
+    meta = {}
+    try:
+        cursor = conn.cursor()
+        cursor.execute(
+            "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
+        db_row = cursor.fetchone()
+        # meta = json.loads(db_row [1]) if db_row else {}
+        meta = jsonpickle.decode(db_row [1]) if db_row else {}
+    except Exception as exc:
+        cursor.close()
+        raise exc
+    finally:
+        cursor.close()
+
+    return meta
+
+def update_meta(conn, job_id: str, **meta):
     """Update the job's metadata."""
-    job = Job.fetch(stale_job.get_id(), connection=rconn)
-    job.refresh()
-    meta_dict = {**stale_job.meta, **job.meta, **kwargs}
-    for key, val in meta_dict.items():
-        job.meta[key] = val
+    try:
+        cursor = conn.cursor()
+        old_meta = retrieve_meta(conn, job_id)
+        meta = {**old_meta, **meta}
+        query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
+        if not bool(old_meta):
+            query = "INSERT INTO jobs VALUES (:job_id, :meta)"
 
-    job.save_meta()
+        cursor.execute(
+            query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
+        conn.commit()
+    except Exception as exc:
+        cursor.close()
+        raise exc
+    finally:
+        cursor.close()
 
-    return job
+    return meta
diff --git a/qc_app/parse.py b/qc_app/parse.py
index 3398918..795cc01 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -1,5 +1,6 @@
 """File parsing module"""
 import os
+import sqlite3
 from functools import reduce
 
 from redis import Redis
@@ -23,32 +24,41 @@ from quality_control.parsing import (
 
 parsebp = Blueprint("parse", __name__)
 
-def queued_parse(filepath, filetype, strainsfile, redis_url):
+def queued_parse(
+        filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
+        dbpath: str):
     job = get_current_job()
+    job_id = job.get_id()
     with Redis.from_url(redis_url) as rconn:
-        jobs.update_meta(rconn, job, status = "in-progress", progress = 0)
-        job.save_meta()
+        dbconn = sqlite3.connect(dbpath)
         try:
+            job_meta = jobs.update_meta(
+                dbconn, job_id, status = "in-progress", progress = 0)
             parsed = parse_file(
                 filepath, filetype, strain_names(parse_strains(strainsfile)))
             for line, curr_size in parsed:
-                jobs.update_meta(
-                    rconn, job, progress = (curr_size/job.meta["filesize"]) * 100,
+                job_meta = jobs.update_meta(
+                    dbconn, job_id,
+                    progress=((curr_size/job_meta["filesize"]) * 100),
                     message = f"Parsed {curr_size} bytes")
-                print(f"Progress: {curr_size} bytes: {(curr_size/job.meta['filesize']) * 100}%")
 
             os.remove(filepath)
-            jobs.update_meta(
-                rconn, job, progress = 10, status = "success",
+            jobs_meta = jobs.update_meta(
+                dbconn, job_id, progress = 100, status = "success",
                 message = "no errors found")
         except ParseError as pe:
             pe_dict = pe.args[0]
-            jobs.update_meta(
-                rconn, job, status = "parse-error", results = {
+            job_meta = jobs.update_meta(
+                dbconn, job_id, status = "parse-error", results = {
                     "filename": os.path.basename(filepath), "filetype": filetype,
                     "position": pe_dict["position"],
                     "line_number": pe_dict["line_number"]
                 })
+        finally:
+            dbconn.close()
+
+def retrieve_dbpath():
+    return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db")
 
 @parsebp.route("/parse", methods=["GET"])
 def parse():
@@ -73,94 +83,145 @@ def parse():
     if errors:
         return redirect(url_for("entry.index"))
 
+    jobs.create_jobs_table(retrieve_dbpath())
     filetype = (
         FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
     job = jobs.enqueue_job(
         "qc_app.parse.queued_parse", filepath, filetype,
         os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"],
-        additional_jobs_meta = {
-            "filename": filename, "filesize": os.stat(filepath).st_size})
+        retrieve_dbpath())
+    try:
+        dbconn = sqlite3.connect(retrieve_dbpath())
+        jobs.update_meta(
+            dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size,
+            status="enqueued", progress=0)
+    except Exception as exc:
+        import traceback
+        print(traceback.format_exc())
+        dbconn.rollback()
+    finally:
+        dbconn.close()
+
     return redirect(url_for("parse.parse_status", job_id=job.get_id()))
 
 @parsebp.route("/status/<job_id>", methods=["GET"])
-def parse_status(job_id):
-    job = jobs.job(job_id)
-    if job:
-        job_id = job.get_id()
-        progress = job.meta["progress"]
-        status = job.meta["status"]
-        filename = job.meta.get("filename", "uploaded file")
-        if status == "success":
-            return redirect(url_for("parse.results", job_id=job_id))
-
-        if status == "parse-error":
-            return redirect(url_for("parse.fail", job_id=job_id))
-
-        return render_template(
-            "job_progress.html",
-            job_id = job_id,
-            job_status = status,
-            progress = progress,
-            message = job.meta.get("message", ""),
-            job_name = f"Parsing '{filename}'")
-
-    return render_template("no_such_job.html", job_id=job_id)
+def parse_status(job_id: str):
+    try:
+        dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10)
+        job = jobs.job(job_id)
+        if job:
+            job_id = job.get_id()
+            job_meta = jobs.retrieve_meta(dbconn, job_id)
+            progress = job_meta["progress"]
+            status = job_meta["status"]
+            filename = job_meta.get("filename", "uploaded file")
+            if status == "success":
+                return redirect(url_for("parse.results", job_id=job_id))
+
+            if status == "parse-error":
+                return redirect(url_for("parse.fail", job_id=job_id))
+
+            return render_template(
+                "job_progress.html",
+                job_id = job_id,
+                job_status = status,
+                progress = progress,
+                message = job_meta.get("message", ""),
+                job_name = f"Parsing '{filename}'")
+
+        return render_template("no_such_job.html", job_id=job_id)
+    except sqlite3.OperationalError:
+        return redirect(url_for("parse.parse_status", job_id=job_id))
+    except Exception as exc:
+        import traceback
+        print(traceback.format_exc())
+        return exc
+    finally:
+        dbconn.close()
 
 @parsebp.route("/results/<job_id>", methods=["GET"])
-def results(job_id):
+def results(job_id: str):
     """Show results of parsing..."""
     job = jobs.job(job_id)
     if job:
-        filename = job.meta["filename"]
-        errors = job.meta.get("errors", [])
-        return render_template(
-            "parse_results.html",
-            errors=errors,
-            job_name = f"Parsing '{filename}'",
-            starting_line_number=job.meta.get("starting_line_number", 0))
+        try:
+            dbconn = sqlite3.connect(retrieve_dbpath())
+            job_meta = jobs.retrieve_meta(dbconn, job_id)
+            filename = job_meta["filename"]
+            errors = job_meta.get("errors", [])
+            return render_template(
+                "parse_results.html",
+                errors=errors,
+                job_name = f"Parsing '{filename}'",
+                starting_line_number=job_meta.get("starting_line_number", 0))
+        except Exception as exc:
+            import traceback
+            print(traceback.format_exc())
+        finally:
+            dbconn.close()
 
     return render_template("no_such_job.html", job_id=job_id)
 
-def queued_collect_errors(filepath, filetype, strainsfile, redis_url, seek_pos=0):
+def queued_collect_errors(
+        filepath: str, filetype: FileType, strainsfile: str, redis_url: str,
+        dbpath: str, seek_pos: int = 0):
     job = get_current_job()
+    job_id = job.get_id()
     errors = []
     count = 0
     with Redis.from_url(redis_url) as rconn:
-        for error in parse_errors(
-                filepath, filetype, strain_names(parse_strains(strainsfile)),
-                seek_pos):
-            count = count + 1
-            jobs.update_meta(
-                rconn, job, message = f"Collected {count} errors", progress = (
-                    (error["position"] / job.meta["filesize"]) * 100))
-            errors.append(error)
-
-        jobs.update_meta(
-            rconn, job, errors = errors, progress = 100, status = "success")
+        try:
+            dbconn = sqlite3.connect(dbpath)
+            job_meta = jobs.retrieve_meta(dbconn, job.get_id())
+            for error in parse_errors(
+                    filepath, filetype, strain_names(parse_strains(strainsfile)),
+                    seek_pos):
+                count = count + 1
+                progress  = ((error["position"] / job_meta["filesize"]) * 100)
+                print(f"CURRENT PROGRESS: {progress}")
+                job_meta = jobs.update_meta(
+                    dbconn, job_id, message = f"Collected {count} errors",
+                    progress = progress)
+                errors.append(error)
+
+            job_meta = jobs.update_meta(
+                dbconn, job_id, errors = errors, progress = 100, status = "success")
+            os.remove(filepath)
+        except Exception as exc:
+            dbconn.rollback()
+        finally:
+            dbconn.close()
 
 @parsebp.route("/fail/<job_id>", methods=["GET"])
-def fail(job_id):
+def fail(job_id: str):
     """Handle parsing failure"""
-    old_job = jobs.job(job_id)
-    if old_job:
-        old_job.refresh()
-        job_id = old_job.get_id()
-        progress = old_job.meta.get("progress", 0)
-        status = old_job.meta["status"]
-        results = old_job.meta["results"]
-        filename = old_job.meta.get("filename", "uploaded file")
-
-        new_job = jobs.enqueue_job(
-            "qc_app.parse.queued_collect_errors",
-            os.path.join(
-                app.config["UPLOAD_FOLDER"], old_job.meta["filename"]),
-            results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"),
-            app.config["REDIS_URL"], results["position"],
-            additional_jobs_meta = {
-                "status": "Collecting Errors",
-                "filename": old_job.meta["filename"],
-                "filesize": old_job.meta["filesize"],
-                "starting_line_number": results["line_number"]})
-        return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
-
-    return render_template("no_such_job.html", job_id=job_id)
+    try:
+        dbpath = retrieve_dbpath()
+        dbconn = sqlite3.connect(dbpath)
+        old_job = jobs.job(job_id)
+        if old_job:
+            job_id = old_job.get_id()
+            old_meta = jobs.retrieve_meta(dbconn, job_id)
+            progress = old_meta["progress"]
+            status = old_meta["status"]
+            results = old_meta["results"]
+            filename = old_meta["filename"]
+
+            new_job = jobs.enqueue_job(
+                "qc_app.parse.queued_collect_errors",
+                os.path.join(
+                    app.config["UPLOAD_FOLDER"], old_meta["filename"]),
+                results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"),
+                app.config["REDIS_URL"], dbpath, results["position"])
+            jobs.update_meta(
+                dbconn, new_job.get_id(), status =  "Collecting Errors",
+                filename = old_meta["filename"], filesize = old_meta["filesize"],
+                starting_line_number = results["line_number"],
+                progress = progress)
+            return redirect(url_for("parse.parse_status", job_id=new_job.get_id()))
+
+        return render_template("no_such_job.html", job_id=job_id)
+    except Exception as exc:
+        dbconn.rollback()
+    finally:
+        dbconn.close()