diff options
-rw-r--r-- | README.org | 22 | ||||
-rw-r--r-- | manifest.scm | 1 | ||||
-rw-r--r-- | qc_app/jobs.py | 96 | ||||
-rw-r--r-- | qc_app/parse.py | 237 | ||||
-rw-r--r-- | qc_app/templates/job_progress.html | 2 | ||||
-rw-r--r-- | qc_app/templates/parse_results.html | 48 | ||||
-rw-r--r-- | scripts/worker.py | 88 |
7 files changed, 233 insertions, 261 deletions
@@ -41,28 +41,10 @@ application. *** Run the CLI version -To run the CLI version of the application, we need to set up the correct ~PYTHONPATH~ +Run the CLI version of the application, with #+BEGIN_SRC shell -export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$(pwd)" +python3 -m scripts.qc --help #+END_SRC -which enables us to run the script with -#+BEGIN_SRC shell -python3 scripts/qc.py --help -#+END_SRC - -Without setting up the ~PYTHONPATH~ environment variable, you might get an error -like the following -#+BEGIN_EXAMPLE shell -$ python3 scripts/qc.py --help -Traceback (most recent call last): - File "/home/frederick/genenetwork/gnqc_py/scripts/qc.py", line 8, in <module> - from quality_control.errors import ParseError -ModuleNotFoundError: No module named 'quality_control' -#+END_EXAMPLE - -*NOTE*: Setting up the ~PYTHONPATH~ environment variable is only necessary for -the development environment. It *MUST NOT* require that the end-user set this up -at all! *** Run the web version diff --git a/manifest.scm b/manifest.scm index 2062b07..42d95bb 100644 --- a/manifest.scm +++ b/manifest.scm @@ -1,6 +1,5 @@ (specifications->manifest (list "python" - "python-rq" "python-mypy" "python-redis" "python-flask" diff --git a/qc_app/jobs.py b/qc_app/jobs.py index afea419..4d3dba6 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,82 +1,32 @@ -import jsonpickle -import sqlite3 +import os +import shlex +import subprocess +from uuid import uuid4 -from rq import Queue -from rq.job import Job from redis import Redis -from flask import current_app as app -def enqueue_job(delayed_fn: str, *args, **kwargs): - """Add job to queue""" - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - job = Job.create( - delayed_fn, args, **{ - key: val for key, val in kwargs.items() - if key != "additional_jobs_meta" - }, - connection = rconn, - timeout = "2h", - result_ttl = "5h", - failure_ttl = "5h" - ) +def error_filename(job_id, error_dir): + return f"{error_dir}/job_{job_id}.error" - queue = Queue("qcapp_queue", connection=rconn) - queue.enqueue_job(job) +def launch_job(redis_conn: Redis, filepath, filetype, redisurl, error_dir): + """Launch a job in the background""" + job_id = str(uuid4()) + command = [ + "python3", "-m" "scripts.worker", filetype, filepath, redisurl, job_id] + job = { + "job_id": job_id, "command": shlex.join(command), "status": "pending", + "filename": os.path.basename(filepath), "percent": 0 + } + redis_conn.hset(name=job["job_id"], mapping=job) - return job + if not os.path.exists(error_dir): + os.mkdir(error_dir) -def job(job_id: str): - "Retrieve the job" - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - queue = Queue("qcapp_queue", connection=rconn) - job = queue.fetch_job(job_id) - job.refresh() + with open(error_filename(job_id, error_dir), "w") as errorfile: + subprocess.Popen(command, stderr=errorfile) return job -def create_jobs_table(dbpath: str): - """Create sqlite3 table to track job progress""" - conn = sqlite3.connect(dbpath) - cursor = conn.cursor() - cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)") - cursor.close() - conn.close() - -def retrieve_meta(conn, job_id: str): - """Retrieve the job's metadata.""" - meta = {} - try: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id}) - db_row = cursor.fetchone() - # meta = json.loads(db_row [1]) if db_row else {} - meta = jsonpickle.decode(db_row [1]) if db_row else {} - except Exception as exc: - cursor.close() - raise exc - finally: - cursor.close() - - return meta - -def update_meta(conn, job_id: str, **meta): - """Update the job's metadata.""" - try: - cursor = conn.cursor() - old_meta = retrieve_meta(conn, job_id) - meta = {**old_meta, **meta} - query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id" - if not bool(old_meta): - query = "INSERT INTO jobs VALUES (:job_id, :meta)" - - cursor.execute( - query, {"job_id": job_id, "meta": jsonpickle.encode(meta)}) - conn.commit() - except Exception as exc: - cursor.close() - raise exc - finally: - cursor.close() - - return meta +def job(redis_conn, job_id: str): + "Retrieve the job" + return redis_conn.hgetall(job_id) diff --git a/qc_app/parse.py b/qc_app/parse.py index baad9a6..d2fa030 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -1,10 +1,9 @@ """File parsing module""" import os -import sqlite3 from functools import reduce +import jsonpickle from redis import Redis -from rq import get_current_job from flask import ( request, url_for, @@ -14,50 +13,13 @@ from flask import ( current_app as app) from . import jobs -from quality_control.errors import ParseError +from quality_control.errors import InvalidValue from quality_control.parsing import ( FileType, - parse_file, - strain_names, - parse_errors) + strain_names) parsebp = Blueprint("parse", __name__) -def queued_parse( - filepath: str, filetype: FileType, strainsfile: str, redis_url: str, - dbpath: str): - job = get_current_job() - job_id = job.get_id() - with Redis.from_url(redis_url) as rconn: - dbconn = sqlite3.connect(dbpath) - try: - job_meta = jobs.update_meta( - dbconn, job_id, status = "in-progress", progress = 0) - parsed = parse_file(filepath, filetype, strain_names(strainsfile)) - for line, curr_size in parsed: - job_meta = jobs.update_meta( - dbconn, job_id, - progress=((curr_size/job_meta["filesize"]) * 100), - message = f"Parsed {curr_size} bytes") - - os.remove(filepath) - jobs_meta = jobs.update_meta( - dbconn, job_id, progress = 100, status = "success", - message = "no errors found") - except ParseError as pe: - pe_dict = pe.args[0] - job_meta = jobs.update_meta( - dbconn, job_id, status = "parse-error", results = { - "filename": os.path.basename(filepath), "filetype": filetype, - "position": pe_dict["position"], - "line_number": pe_dict["line_number"] - }) - finally: - dbconn.close() - -def retrieve_dbpath(): - return os.path.join(app.config["UPLOAD_FOLDER"], "jobs.db") - @parsebp.route("/parse", methods=["GET"]) def parse(): """Trigger file parsing""" @@ -73,6 +35,10 @@ def parse(): flash("No filetype provided", "alert-error") errors = True + if filetype not in ("average", "standard-error"): + flash("Invalid filetype provided", "alert-error") + errors = True + filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename) if not os.path.exists(filepath): flash("Selected file does not exist (any longer)", "alert-danger") @@ -81,144 +47,83 @@ def parse(): if errors: return redirect(url_for("entry.index")) - jobs.create_jobs_table(retrieve_dbpath()) - filetype = ( - FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) - job = jobs.enqueue_job( - "qc_app.parse.queued_parse", filepath, filetype, - os.path.join(os.getcwd(), "etc/strains.csv"), app.config["REDIS_URL"], - retrieve_dbpath()) - try: - dbconn = sqlite3.connect(retrieve_dbpath()) - jobs.update_meta( - dbconn, job.get_id(), filename=filename, filesize=os.stat(filepath).st_size, - status="enqueued", progress=0) - except Exception as exc: - import traceback - print(traceback.format_exc()) - dbconn.rollback() - finally: - dbconn.close() - - return redirect(url_for("parse.parse_status", job_id=job.get_id())) + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + job = jobs.launch_job( + rconn, filepath, filetype, app.config["REDIS_URL"], + f"{app.config['UPLOAD_FOLDER']}/job_errors") + + return redirect(url_for("parse.parse_status", job_id=job["job_id"])) @parsebp.route("/status/<job_id>", methods=["GET"]) def parse_status(job_id: str): - try: - dbconn = sqlite3.connect(retrieve_dbpath(), timeout=10) - job = jobs.job(job_id) - if job: - job_id = job.get_id() - job_meta = jobs.retrieve_meta(dbconn, job_id) - progress = job_meta["progress"] - status = job_meta["status"] - filename = job_meta.get("filename", "uploaded file") - if status == "success": - return redirect(url_for("parse.results", job_id=job_id)) - - if status == "parse-error": + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + job = jobs.job(rconn, job_id) + + if job: + error_filename = jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors") + if os.path.exists(error_filename): + stat = os.stat(error_filename) + if stat.st_size > 0: return redirect(url_for("parse.fail", job_id=job_id)) - return render_template( - "job_progress.html", - job_id = job_id, - job_status = status, - progress = progress, - message = job_meta.get("message", ""), - job_name = f"Parsing '{filename}'") - - return render_template("no_such_job.html", job_id=job_id) - except sqlite3.OperationalError: - return redirect(url_for("parse.parse_status", job_id=job_id)) - except Exception as exc: - import traceback - print(traceback.format_exc()) - return exc - finally: - dbconn.close() + print(f"THE JOB {job}") + job_id = job["job_id"] + progress = float(job["percent"]) + status = job["status"] + filename = job.get("filename", "uploaded file") + if status == "success": + return redirect(url_for("parse.results", job_id=job_id)) + + if status == "parse-error": + return redirect(url_for("parse.fail", job_id=job_id)) + + return render_template( + "job_progress.html", + job_id = job_id, + job_status = status, + progress = progress, + message = job.get("message", ""), + job_name = f"Parsing '{filename}'") + + return render_template("no_such_job.html", job_id=job_id) @parsebp.route("/results/<job_id>", methods=["GET"]) def results(job_id: str): """Show results of parsing...""" - job = jobs.job(job_id) + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + job = jobs.job(rconn, job_id) + if job: - try: - dbconn = sqlite3.connect(retrieve_dbpath()) - job_meta = jobs.retrieve_meta(dbconn, job_id) - filename = job_meta["filename"] - errors = job_meta.get("errors", []) - return render_template( - "parse_results.html", - errors=errors, - job_name = f"Parsing '{filename}'", - starting_line_number=job_meta.get("starting_line_number", 0)) - except Exception as exc: - import traceback - print(traceback.format_exc()) - finally: - dbconn.close() + filename = job["filename"] + errors = jsonpickle.decode(job["errors"]) + return render_template( + "parse_results.html", + errors=errors, + job_name = f"Parsing '{filename}'", + isinvalidvalue=lambda item: isinstance(item, InvalidValue)) return render_template("no_such_job.html", job_id=job_id) -def queued_collect_errors( - filepath: str, filetype: FileType, strainsfile: str, redis_url: str, - dbpath: str, seek_pos: int = 0): - job = get_current_job() - job_id = job.get_id() - errors = [] - count = 0 - with Redis.from_url(redis_url) as rconn: - try: - dbconn = sqlite3.connect(dbpath) - job_meta = jobs.retrieve_meta(dbconn, job.get_id()) - for error in parse_errors( - filepath, filetype, strain_names(strainsfile), - seek_pos): - count = count + 1 - progress = ((error["position"] / job_meta["filesize"]) * 100) - job_meta = jobs.update_meta( - dbconn, job_id, message = f"Collected {count} errors", - progress = progress) - errors.append(error) - - job_meta = jobs.update_meta( - dbconn, job_id, errors = errors, progress = 100, status = "success") - os.remove(filepath) - except Exception as exc: - dbconn.rollback() - finally: - dbconn.close() - @parsebp.route("/fail/<job_id>", methods=["GET"]) def fail(job_id: str): """Handle parsing failure""" - try: - dbpath = retrieve_dbpath() - dbconn = sqlite3.connect(dbpath) - old_job = jobs.job(job_id) - if old_job: - job_id = old_job.get_id() - old_meta = jobs.retrieve_meta(dbconn, job_id) - progress = old_meta["progress"] - status = old_meta["status"] - results = old_meta["results"] - filename = old_meta["filename"] - - new_job = jobs.enqueue_job( - "qc_app.parse.queued_collect_errors", - os.path.join( - app.config["UPLOAD_FOLDER"], old_meta["filename"]), - results["filetype"], os.path.join(os.getcwd(), "etc/strains.csv"), - app.config["REDIS_URL"], dbpath, results["position"]) - jobs.update_meta( - dbconn, new_job.get_id(), status = "Collecting Errors", - filename = old_meta["filename"], filesize = old_meta["filesize"], - starting_line_number = results["line_number"], - progress = progress) - return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) - - return render_template("no_such_job.html", job_id=job_id) - except Exception as exc: - dbconn.rollback() - finally: - dbconn.close() + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + old_job = jobs.job(rconn, job_id) + + if old_job: + error_filename = jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors") + if os.path.exists(error_filename): + stat = os.stat(error_filename) + if stat.st_size > 0: + with open(error_filename) as errorfile: + return errorfile.read() + + # job_id = old_job["job_id"] + progress = old_job["progress"] + status = old_job["status"] + results = old_job["results"] + filename = old_job["filename"] + + return render_template("no_such_job.html", job_id=job_id) diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html index 26dfe6f..8a9256f 100644 --- a/qc_app/templates/job_progress.html +++ b/qc_app/templates/job_progress.html @@ -13,7 +13,7 @@ <span>{{job_status}}: {{message}}</span><br /> <label for="job_{{job_id}}">parsing: </label> -<progress id="job_{{job_id}}" value="{{progress}}">{{progress}}</progress> +<progress id="job_{{job_id}}" value="{{progress/100}}">{{progress}}</progress> <span>{{"%.2f" | format(progress)}}%</span> {%endblock%} diff --git a/qc_app/templates/parse_results.html b/qc_app/templates/parse_results.html new file mode 100644 index 0000000..a750bb5 --- /dev/null +++ b/qc_app/templates/parse_results.html @@ -0,0 +1,48 @@ +{%extends "base.html"%} + +{%block title%}Parse Results{%endblock%} + +{%block contents%} +<h1 class="heading">{{job_name}}: parse results</h2> + +{%if errors | length == 0 %} +<span class="alert-success">No errors found in the file</span> +{%else %} +<p class="alert-error">We found the following errors</p> + +<table class="reports-table"> + <thead> + <tr> + <th>line number</th> + <th>column(s)</th> + <th>error</th> + <th>error message</th> + </tr> + </thead> + + <tbody> + {%for error in errors%} + <tr> + <td>{{error["line"]}}</td> + <td> + {%if isinvalidvalue(error):%} + {{error.column}} + {%else: %} + {{error.columns}} + {%endif %} + </td> + <td> + {%if isinvalidvalue(error):%} + Invalid Value + {%else: %} + Duplicate Header + {%endif %} + </td> + <td>{{error["message"]}}</td> + </tr> + {%endfor%} + </tbody> +</table> +{%endif%} + +{%endblock%} diff --git a/scripts/worker.py b/scripts/worker.py new file mode 100644 index 0000000..f4d5b6b --- /dev/null +++ b/scripts/worker.py @@ -0,0 +1,88 @@ +import os +import sys +from typing import Callable + +import jsonpickle +from redis import Redis +from redis.exceptions import ConnectionError + +from .qc import cli_argument_parser +from quality_control.utils import make_progress_calculator +from quality_control.parsing import ( + take, FileType, strain_names, collect_errors) + + +def make_progress_indicator( + redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable: + def __indicator__(linenumber, linetext): + progress = progress_calc_fn(linenumber, linetext) + redis_connection.hset(name=job_id, mapping=progress._asdict()) + + return progress + + return __indicator__ + +def cli_args_valid(args): + if not os.path.exists(args.filepath): + print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) + return None + + if not os.path.exists(args.strainsfile): + print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) + return None + + try: + conn = Redis.from_url(args.redisurl) + except ConnectionError as ce: + print(traceback.format_exc(), file=sys.stderr) + return None + + return args + +def process_cli_arguments(): + parser = cli_argument_parser() + parser.prog = "worker" + parser.add_argument( + "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument("job_id", help="The id of the job being processed") + + return cli_args_valid(parser.parse_args()) + +def main(): + args = process_cli_arguments() + if args is None: + print("Quiting due to errors!", file=sys.stderr) + return 1 + + with Redis.from_url(args.redisurl) as redis_conn: + progress_calculator = make_progress_calculator( + os.stat(args.filepath).st_size) + progress_indicator = make_progress_indicator( + redis_conn, args.job_id, progress_calculator) + count = args.count + filepath = args.filepath + filetype = ( + FileType.AVERAGE if args.filetype == "average" + else FileType.STANDARD_ERROR) + strains = strain_names(args.strainsfile) + + redis_conn.hset( + name=args.job_id, key="status", value="Processing") + redis_conn.hset( + name=args.job_id, key="message", value="Collecting errors") + + if count > 0: + errors = take( + collect_errors(filepath, filetype, strains, progress_indicator), + count) + else: + errors = collect_errors(filepath, filetype, strains, progress_indicator) + + redis_conn.hset( + name=args.job_id, key="errors", value=jsonpickle.encode(errors)) + redis_conn.hset(name=args.job_id, key="status", value="success") + + return 0 + +if __name__ == "__main__": + main() |