diff options
author | Frederick Muriuki Muriithi | 2022-04-26 15:18:26 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-04-26 15:18:26 +0300 |
commit | ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d (patch) | |
tree | f4dc7e29b8e49a1cbf2412d0bcab55294957e021 /qc_app | |
parent | e6895f5bac672d2e1d2a04fe8118fa55c3a40b91 (diff) | |
download | gn-uploader-ea70b7a7db42d51fa7f22f3dcb6d2aca6d8a795d.tar.gz |
Fix issues caught processing the jobs
* Create and push the application context for the worker functions
* Fix the update of meta fields
Diffstat (limited to 'qc_app')
-rw-r--r-- | qc_app/__init__.py | 10 | ||||
-rw-r--r-- | qc_app/jobs.py | 33 | ||||
-rw-r--r-- | qc_app/parse.py | 90 | ||||
-rw-r--r-- | qc_app/templates/job_progress.html | 2 |
4 files changed, 107 insertions, 28 deletions
diff --git a/qc_app/__init__.py b/qc_app/__init__.py index 35cc422..7f423c2 100644 --- a/qc_app/__init__.py +++ b/qc_app/__init__.py @@ -7,6 +7,16 @@ from flask import Flask from .entry import entrybp from .parse import parsebp +def instance_path(): + """Retrieve the `instance_path`. Raise an exception if not defined.""" + path = os.getenv("QCAPP_INSTANCE_PATH", None) + if path is None: + raise Exception(( + "Configuration Error: Set the `QCAPP_INSTANCE_PATH` environment " + "variable.")) + + return path + def create_app(instance_path): """The application factory""" app = Flask( diff --git a/qc_app/jobs.py b/qc_app/jobs.py index dbeb9ce..908c244 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,19 +1,46 @@ from rq import Queue +from rq.job import Job from redis import Redis from flask import current_app as app def enqueue_job(delayed_fn, *args, **kwargs): with Redis.from_url(app.config["REDIS_URL"]) as rconn: + job = Job.create( + delayed_fn, args, **{ + key: val for key, val in kwargs.items() + if key != "additional_jobs_meta" + }, + connection = rconn) + job.meta["status"] = "enqueued" + job.meta["progress"] = 0 + if "additional_jobs_meta" in kwargs: + for key, val in kwargs["additional_jobs_meta"].items(): + job.meta[key] = val + + job.save_meta() + queue = Queue("qcapp_queue", connection=rconn) - job = queue.enqueue(delayed_fn, *args, **kwargs) + queue.enqueue_job(job) - job.meta["status"] = "enqueued" - job.save_meta() return job def job(job_id): with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) job = queue.fetch_job(job_id) + job.refresh() + + return job + +def update_meta(stale_job, **kwargs): + with Redis.from_url(app.config["REDIS_URL"]) as rconn: + queue = Queue("qcapp_queue", connection=rconn) + job = queue.fetch_job(stale_job.get_id()) + job.refresh() + meta_dict = {**stale_job.meta, **job.meta, **kwargs} + for key, val in meta_dict.items(): + job.meta[key] = val + + job.save_meta() return job diff --git a/qc_app/parse.py b/qc_app/parse.py index 1ebe637..ffdd123 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -16,37 +16,37 @@ from quality_control.parsing import ( FileType, parse_file, strain_names, + parse_errors, parse_strains) parsebp = Blueprint("parse", __name__) def queued_parse(filepath, filetype): + from qc_app import create_app, instance_path + app = create_app(instance_path()) + app.app_context().push() + job = get_current_job() - job.meta["filename"] = os.path.basename(filepath) - job.meta["status"] = "in-progress" + jobs.update_meta(job, status = "in-progress", progress = 0) job.save_meta() - filesize = os.stat(filepath).st_size try: parsed = parse_file( filepath, filetype, strain_names(parse_strains("strains.csv"))) for line, curr_size in parsed: - job.meta["progress"] = (curr_size/filesize) * 100 - job.meta["status"] = f"Parsed {curr_size} bytes" - job.save_meta() + jobs.update_meta( + job, progress = (curr_size/job.meta["filesize"]) * 100, + message = f"Parsed {curr_size} bytes") os.remove(filepath) - job.meta["progress"] = 100 - job.meta["status"] = "success" - job.meta["results"] = {"message": "no errors found"} - job.save_meta() + jobs.update_meta( + job, progress = 10, status = "success", message = "no errors found") except ParseError as pe: pe_dict = pe.args[0] - job.meta["status"] = "parse-error" - job.meta["results"] = { - "filename": filename, "filetype": filetype, - "position": pe_dict["position"] - } - job.save_meta() + jobs.update_meta( + job, status = "parse-error", results = { + "filename": os.path.basename(filepath), "filetype": filetype, + "position": pe_dict["position"] + }) @parsebp.route("/parse", methods=["GET"]) def parse(): @@ -73,9 +73,10 @@ def parse(): filetype = ( FileType.AVERAGE if filetype == "average" else FileType.STANDARD_ERROR) - job = jobs.enqueue_job("qc_app.parse.queued_parse", filepath, filetype) - job.meta["filename"] = filename - job.save_meta() + job = jobs.enqueue_job( + "qc_app.parse.queued_parse", filepath, filetype, + additional_jobs_meta = { + "filename": filename, "filesize": os.stat(filepath).st_size}) return redirect(url_for("parse.parse_status", job_id=job.get_id())) @parsebp.route("/status/<job_id>", methods=["GET"]) @@ -83,7 +84,7 @@ def parse_status(job_id): job = jobs.job(job_id) if job: job_id = job.get_id() - progress = job.meta.get("progress", 0) + progress = job.meta["progress"] status = job.meta["status"] filename = job.meta.get("filename", "uploaded file") if status == "success": @@ -97,6 +98,7 @@ def parse_status(job_id): job_id = job_id, job_status = status, progress = progress, + message = job.meta.get("message", ""), job_name = f"Parsing '{filename}'") return render_template("no_such_job.html", job_id=job_id) @@ -106,7 +108,47 @@ def results(job_id): """Indicates success if parsing the file is successful""" return "STUB: Parse success!!!" -@parsebp.route("/fail", methods=["GET"]) -def fail(): - """Indicates success if parsing the file is successful""" - return "STUB: Parse Failure!!!" +def queued_collect_errors(filepath, filetype, seek_pos=0): + from qc_app import create_app, instance_path + app = create_app(instance_path()) + app.app_context().push() + + job = get_current_job() + errors = [] + count = 0 + + for error in parse_errors( + filepath, filetype, strain_names(parse_strains("strains.csv")), + seek_pos): + count = count + 1 + jobs.update_meta( + job, message = f"Collected {count} errors", progress = ( + (error["position"] / job.meta["filesize"]) * 100)) + errors.append(error) + + jobs.update_meta(job, errors = errors, progress = 100, status = "success") + +@parsebp.route("/fail/<job_id>", methods=["GET"]) +def fail(job_id): + """Handle parsing failure""" + old_job = jobs.job(job_id) + if old_job: + old_job.refresh() + job_id = old_job.get_id() + progress = old_job.meta.get("progress", 0) + status = old_job.meta["status"] + results = old_job.meta["results"] + filename = old_job.meta.get("filename", "uploaded file") + + new_job = jobs.enqueue_job( + "qc_app.parse.queued_collect_errors", + os.path.join( + app.config["UPLOAD_FOLDER"], old_job.meta["filename"]), + results["filetype"], results["position"], + additional_jobs_meta = { + "status": "Collecting Errors", + "filename": old_job.meta["filename"], + "filesize": old_job.meta["filesize"]}) + return redirect(url_for("parse.parse_status", job_id=new_job.get_id())) + + return render_template("no_such_job.html", job_id=job_id) diff --git a/qc_app/templates/job_progress.html b/qc_app/templates/job_progress.html index 1c6aa32..26dfe6f 100644 --- a/qc_app/templates/job_progress.html +++ b/qc_app/templates/job_progress.html @@ -10,7 +10,7 @@ <h1 class="heading">{{job_name}}</h2> <label for="job_status">status:</label> -<span>{{job_status}}</span><br /> +<span>{{job_status}}: {{message}}</span><br /> <label for="job_{{job_id}}">parsing: </label> <progress id="job_{{job_id}}" value="{{progress}}">{{progress}}</progress> |