aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-26 09:43:18 +0300
committerFrederick Muriuki Muriithi2022-04-26 09:43:18 +0300
commite6895f5bac672d2e1d2a04fe8118fa55c3a40b91 (patch)
tree3597796b13b3b321c8670aa71b080eabf3357b60
parenta5477c59452cdb01ab536f11eb5ed6fab015f3af (diff)
downloadgn-uploader-e6895f5bac672d2e1d2a04fe8118fa55c3a40b91.tar.gz
Queue file parsing jobs
Enable the queuing of file parsing jobs, since the files could be really large and take a long time to parse and present results. * etc/default_config.py: Add default config for redis server * manifest.scm: Add redis, and rq as dependencies * qc_app/__init__.py * qc_app/jobs.py: module to hold utilities for management of the jobs * qc_app/parse.py: Enqueue the job - extract file-parsing code to callable function * qc_app/templates/base.html: Enable addition of extra meta tags * qc_app/templates/job_progress.html: template to display job progress * qc_app/templates/no_such_job.html: template to indicate when a job id is invalid * quality_control/parsing.py: Add the total size parsed so far
-rw-r--r--etc/default_config.py1
-rw-r--r--manifest.scm2
-rw-r--r--qc_app/__init__.py1
-rw-r--r--qc_app/jobs.py19
-rw-r--r--qc_app/parse.py84
-rw-r--r--qc_app/templates/base.html1
-rw-r--r--qc_app/templates/job_progress.html19
-rw-r--r--qc_app/templates/no_such_job.html14
-rw-r--r--quality_control/parsing.py6
9 files changed, 127 insertions, 20 deletions
diff --git a/etc/default_config.py b/etc/default_config.py
index 76df031..28da532 100644
--- a/etc/default_config.py
+++ b/etc/default_config.py
@@ -8,3 +8,4 @@ import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING")
SECRET_KEY = b"<Please! Please! Please! Change This!>"
UPLOAD_FOLDER = "/tmp/qc_app_files"
+REDIS_URL = "redis://"
diff --git a/manifest.scm b/manifest.scm
index d5ce588..d93902a 100644
--- a/manifest.scm
+++ b/manifest.scm
@@ -1,6 +1,8 @@
(specifications->manifest
(list "python"
+ "python-rq"
"python-mypy"
+ "python-redis"
"python-flask"
"python-pylint"
"python-pytest"
diff --git a/qc_app/__init__.py b/qc_app/__init__.py
index 9b5ed76..35cc422 100644
--- a/qc_app/__init__.py
+++ b/qc_app/__init__.py
@@ -1,6 +1,7 @@
"""The Quality-Control Web Application entry point"""
import os
+
from flask import Flask
from .entry import entrybp
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
new file mode 100644
index 0000000..dbeb9ce
--- /dev/null
+++ b/qc_app/jobs.py
@@ -0,0 +1,19 @@
+from rq import Queue
+from redis import Redis
+from flask import current_app as app
+
+def enqueue_job(delayed_fn, *args, **kwargs):
+ with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+ queue = Queue("qcapp_queue", connection=rconn)
+ job = queue.enqueue(delayed_fn, *args, **kwargs)
+
+ job.meta["status"] = "enqueued"
+ job.save_meta()
+ return job
+
+def job(job_id):
+ with Redis.from_url(app.config["REDIS_URL"]) as rconn:
+ queue = Queue("qcapp_queue", connection=rconn)
+ job = queue.fetch_job(job_id)
+
+ return job
diff --git a/qc_app/parse.py b/qc_app/parse.py
index aa88260..1ebe637 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -1,8 +1,16 @@
"""File parsing module"""
import os
-from flask import request, url_for, redirect, Blueprint, current_app as app
+from rq import get_current_job
+from flask import (
+ request,
+ url_for,
+ redirect,
+ Blueprint,
+ render_template,
+ current_app as app)
+from . import jobs
from quality_control.errors import ParseError
from quality_control.parsing import (
FileType,
@@ -12,11 +20,37 @@ from quality_control.parsing import (
parsebp = Blueprint("parse", __name__)
+def queued_parse(filepath, filetype):
+ job = get_current_job()
+ job.meta["filename"] = os.path.basename(filepath)
+ job.meta["status"] = "in-progress"
+ job.save_meta()
+ filesize = os.stat(filepath).st_size
+ try:
+ parsed = parse_file(
+ filepath, filetype, strain_names(parse_strains("strains.csv")))
+ for line, curr_size in parsed:
+ job.meta["progress"] = (curr_size/filesize) * 100
+ job.meta["status"] = f"Parsed {curr_size} bytes"
+ job.save_meta()
+
+ os.remove(filepath)
+ job.meta["progress"] = 100
+ job.meta["status"] = "success"
+ job.meta["results"] = {"message": "no errors found"}
+ job.save_meta()
+ except ParseError as pe:
+ pe_dict = pe.args[0]
+ job.meta["status"] = "parse-error"
+ job.meta["results"] = {
+ "filename": filename, "filetype": filetype,
+ "position": pe_dict["position"]
+ }
+ job.save_meta()
+
@parsebp.route("/parse", methods=["GET"])
def parse():
"""Trigger file parsing"""
- # TODO: figure out how to redirect with post
- # TODO: figure out how to stat file and get: total number of lines
# TODO: Maybe implement external process to parse the files
errors = False
filename = request.args.get("filename")
@@ -39,22 +73,36 @@ def parse():
filetype = (
FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR)
- try:
- parsed = parse_file(
- filepath, filetype, strain_names(parse_strains("strains.csv")))
- for line in parsed:
- pass
- os.remove(filepath)
- return redirect(url_for(
- "parse.success", filename=filename, filetype=filetype))
- except ParseError as pe:
- pe_dict = pe.args[0]
- return redirect(url_for(
- "parse.fail", filename = filename, filetype = filetype,
- position = pe_dict["position"]))
+ job = jobs.enqueue_job("qc_app.parse.queued_parse", filepath, filetype)
+ job.meta["filename"] = filename
+ job.save_meta()
+ return redirect(url_for("parse.parse_status", job_id=job.get_id()))
+
+@parsebp.route("/status/<job_id>", methods=["GET"])
+def parse_status(job_id):
+ job = jobs.job(job_id)
+ if job:
+ job_id = job.get_id()
+ progress = job.meta.get("progress", 0)
+ status = job.meta["status"]
+ filename = job.meta.get("filename", "uploaded file")
+ if status == "success":
+ return redirect(url_for("parse.results", job_id=job_id))
+
+ if status == "parse-error":
+ return redirect(url_for("parse.fail", job_id=job_id))
+
+ return render_template(
+ "job_progress.html",
+ job_id = job_id,
+ job_status = status,
+ progress = progress,
+ job_name = f"Parsing '{filename}'")
+
+ return render_template("no_such_job.html", job_id=job_id)
-@parsebp.route("/success", methods=["GET"])
-def success():
+@parsebp.route("/results/<job_id>", methods=["GET"])
+def results(job_id):
"""Indicates success if parsing the file is successful"""
return "STUB: Parse success!!!"
diff --git a/qc_app/templates/base.html b/qc_app/templates/base.html
index 67ba6b5..623141a 100644
--- a/qc_app/templates/base.html
+++ b/qc_app/templates/base.html
@@ -4,6 +4,7 @@
<meta charset="UTF-8" />
<meta application-name="GeneNetwork Quality-Control Application" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
+ {%block extrameta%}{%endblock%}
<title>QC: {%block title%}{%endblock%}</title>
diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html
new file mode 100644
index 0000000..1c6aa32
--- /dev/null
+++ b/qc_app/templates/job_progress.html
@@ -0,0 +1,19 @@
+{%extends "base.html"%}
+
+{%block extrameta%}
+<meta http-equiv="refresh" content="5">
+{%endblock%}
+
+{%block title%}Job Status{%endblock%}
+
+{%block contents%}
+<h1 class="heading">{{job_name}}</h2>
+
+<label for="job_status">status:</label>
+<span>{{job_status}}</span><br />
+
+<label for="job_{{job_id}}">parsing: </label>
+<progress id="job_{{job_id}}" value="{{progress}}">{{progress}}</progress>
+<span>{{"%.2f" | format(progress)}}%</span>
+
+{%endblock%}
diff --git a/qc_app/templates/no_such_job.html b/qc_app/templates/no_such_job.html
new file mode 100644
index 0000000..42a2d48
--- /dev/null
+++ b/qc_app/templates/no_such_job.html
@@ -0,0 +1,14 @@
+{%extends "base.html"%}
+
+{%block extrameta%}
+<meta http-equiv="refresh" content="5;url={{url_for('entry.upload_file')}}">
+{%endblock%}
+
+{%block title%}No Such Job{%endblock%}
+
+{%block contents%}
+<h1 class="heading">No Such Job: {{job_id}}</h2>
+
+<p>No job, with the id '<em>{{job_id}}</em>' was found!</p>
+
+{%endblock%}
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index eb4c989..a4edb0f 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -66,8 +66,10 @@ def parse_file(filepath: str, filetype: FileType, strains: list):
seek_pos = seek_pos + len(line)
continue
- yield LINE_PARSERS[filetype](
- tuple(field.strip() for field in line.split("\t")))
+ yield (
+ LINE_PARSERS[filetype](
+ tuple(field.strip() for field in line.split("\t"))),
+ seek_pos + len(line))
seek_pos = seek_pos + len(line)
except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
raise ParseError({