From e6895f5bac672d2e1d2a04fe8118fa55c3a40b91 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 26 Apr 2022 09:43:18 +0300 Subject: Queue file parsing jobs Enable the queuing of file parsing jobs, since the files could be really large and take a long time to parse and present results. * etc/default_config.py: Add default config for redis server * manifest.scm: Add redis, and rq as dependencies * qc_app/__init__.py * qc_app/jobs.py: module to hold utilities for management of the jobs * qc_app/parse.py: Enqueue the job - extract file-parsing code to callable function * qc_app/templates/base.html: Enable addition of extra meta tags * qc_app/templates/job_progress.html: template to display job progress * qc_app/templates/no_such_job.html: template to indicate when a job id is invalid * quality_control/parsing.py: Add the total size parsed so far --- etc/default_config.py | 1 + manifest.scm | 2 + qc_app/__init__.py | 1 + qc_app/jobs.py | 19 +++++++++ qc_app/parse.py | 84 ++++++++++++++++++++++++++++++-------- qc_app/templates/base.html | 1 + qc_app/templates/job_progress.html | 19 +++++++++ qc_app/templates/no_such_job.html | 14 +++++++ quality_control/parsing.py | 6 ++- 9 files changed, 127 insertions(+), 20 deletions(-) create mode 100644 qc_app/jobs.py create mode 100644 qc_app/templates/job_progress.html create mode 100644 qc_app/templates/no_such_job.html diff --git a/etc/default_config.py b/etc/default_config.py index 76df031..28da532 100644 --- a/etc/default_config.py +++ b/etc/default_config.py @@ -8,3 +8,4 @@ import os LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING") SECRET_KEY = b"" UPLOAD_FOLDER = "/tmp/qc_app_files" +REDIS_URL = "redis://" diff --git a/manifest.scm b/manifest.scm index d5ce588..d93902a 100644 --- a/manifest.scm +++ b/manifest.scm @@ -1,6 +1,8 @@ (specifications->manifest (list "python" + "python-rq" "python-mypy" + "python-redis" "python-flask" "python-pylint" "python-pytest" diff --git a/qc_app/__init__.py b/qc_app/__init__.py index 9b5ed76..35cc422 100644 --- a/qc_app/__init__.py +++ b/qc_app/__init__.py @@ -1,6 +1,7 @@ """The Quality-Control Web Application entry point""" import os + from flask import Flask from .entry import entrybp diff --git a/qc_app/jobs.py b/qc_app/jobs.py new file mode 100644 index 0000000..dbeb9ce --- /dev/null +++ b/qc_app/jobs.py @@ -0,0 +1,19 @@ +from rq import Queue +from redis import Redis +from flask import current_app as app + +def enqueue_job(delayed_fn, *args, **kwargs): + with Redis.from_url(app.config["REDIS_URL"]) as rconn: + queue = Queue("qcapp_queue", connection=rconn) + job = queue.enqueue(delayed_fn, *args, **kwargs) + + job.meta["status"] = "enqueued" + job.save_meta() + return job + +def job(job_id): + with Redis.from_url(app.config["REDIS_URL"]) as rconn: + queue = Queue("qcapp_queue", connection=rconn) + job = queue.fetch_job(job_id) + + return job diff --git a/qc_app/parse.py b/qc_app/parse.py index aa88260..1ebe637 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -1,8 +1,16 @@ """File parsing module""" import os -from flask import request, url_for, redirect, Blueprint, current_app as app +from rq import get_current_job +from flask import ( + request, + url_for, + redirect, + Blueprint, + render_template, + current_app as app) +from . import jobs from quality_control.errors import ParseError from quality_control.parsing import ( FileType, @@ -12,11 +20,37 @@ from quality_control.parsing import ( parsebp = Blueprint("parse", __name__) +def queued_parse(filepath, filetype): + job = get_current_job() + job.meta["filename"] = os.path.basename(filepath) + job.meta["status"] = "in-progress" + job.save_meta() + filesize = os.stat(filepath).st_size + try: + parsed = parse_file( + filepath, filetype, strain_names(parse_strains("strains.csv"))) + for line, curr_size in parsed: + job.meta["progress"] = (curr_size/filesize) * 100 + job.meta["status"] = f"Parsed {curr_size} bytes" + job.save_meta() + + os.remove(filepath) + job.meta["progress"] = 100 + job.meta["status"] = "success" + job.meta["results"] = {"message": "no errors found"} + job.save_meta() + except ParseError as pe: + pe_dict = pe.args[0] + job.meta["status"] = "parse-error" + job.meta["results"] = { + "filename": filename, "filetype": filetype, + "position": pe_dict["position"] + } + job.save_meta() + @parsebp.route("/parse", methods=["GET"]) def parse(): """Trigger file parsing""" - # TODO: figure out how to redirect with post - # TODO: figure out how to stat file and get: total number of lines # TODO: Maybe implement external process to parse the files errors = False filename = request.args.get("filename") @@ -39,22 +73,36 @@ def parse(): filetype = ( FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) - try: - parsed = parse_file( - filepath, filetype, strain_names(parse_strains("strains.csv"))) - for line in parsed: - pass - os.remove(filepath) - return redirect(url_for( - "parse.success", filename=filename, filetype=filetype)) - except ParseError as pe: - pe_dict = pe.args[0] - return redirect(url_for( - "parse.fail", filename = filename, filetype = filetype, - position = pe_dict["position"])) + job = jobs.enqueue_job("qc_app.parse.queued_parse", filepath, filetype) + job.meta["filename"] = filename + job.save_meta() + return redirect(url_for("parse.parse_status", job_id=job.get_id())) + +@parsebp.route("/status/", methods=["GET"]) +def parse_status(job_id): + job = jobs.job(job_id) + if job: + job_id = job.get_id() + progress = job.meta.get("progress", 0) + status = job.meta["status"] + filename = job.meta.get("filename", "uploaded file") + if status == "success": + return redirect(url_for("parse.results", job_id=job_id)) + + if status == "parse-error": + return redirect(url_for("parse.fail", job_id=job_id)) + + return render_template( + "job_progress.html", + job_id = job_id, + job_status = status, + progress = progress, + job_name = f"Parsing '{filename}'") + + return render_template("no_such_job.html", job_id=job_id) -@parsebp.route("/success", methods=["GET"]) -def success(): +@parsebp.route("/results/", methods=["GET"]) +def results(job_id): """Indicates success if parsing the file is successful""" return "STUB: Parse success!!!" diff --git a/qc_app/templates/base.html b/qc_app/templates/base.html index 67ba6b5..623141a 100644 --- a/qc_app/templates/base.html +++ b/qc_app/templates/base.html @@ -4,6 +4,7 @@ + {%block extrameta%}{%endblock%} QC: {%block title%}{%endblock%} diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html new file mode 100644 index 0000000..1c6aa32 --- /dev/null +++ b/qc_app/templates/job_progress.html @@ -0,0 +1,19 @@ +{%extends "base.html"%} + +{%block extrameta%} + +{%endblock%} + +{%block title%}Job Status{%endblock%} + +{%block contents%} +

{{job_name}}

+ + +{{job_status}}
+ + +{{progress}} +{{"%.2f" | format(progress)}}% + +{%endblock%} diff --git a/qc_app/templates/no_such_job.html b/qc_app/templates/no_such_job.html new file mode 100644 index 0000000..42a2d48 --- /dev/null +++ b/qc_app/templates/no_such_job.html @@ -0,0 +1,14 @@ +{%extends "base.html"%} + +{%block extrameta%} + +{%endblock%} + +{%block title%}No Such Job{%endblock%} + +{%block contents%} +

No Such Job: {{job_id}}

+ +

No job, with the id '{{job_id}}' was found!

+ +{%endblock%} diff --git a/quality_control/parsing.py b/quality_control/parsing.py index eb4c989..a4edb0f 100644 --- a/quality_control/parsing.py +++ b/quality_control/parsing.py @@ -66,8 +66,10 @@ def parse_file(filepath: str, filetype: FileType, strains: list): seek_pos = seek_pos + len(line) continue - yield LINE_PARSERS[filetype]( - tuple(field.strip() for field in line.split("\t"))) + yield ( + LINE_PARSERS[filetype]( + tuple(field.strip() for field in line.split("\t"))), + seek_pos + len(line)) seek_pos = seek_pos + len(line) except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: raise ParseError({ -- cgit v1.2.3