From d4e45e5bf9877957c36b8e0a537ba3819a674614 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 18 Dec 2023 12:01:38 +0300 Subject: Samples: Hook-up external async script to upload the samples. --- qc_app/samples.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 98 insertions(+), 15 deletions(-) (limited to 'qc_app/samples.py') diff --git a/qc_app/samples.py b/qc_app/samples.py index 88a0fde..e1e88bd 100644 --- a/qc_app/samples.py +++ b/qc_app/samples.py @@ -1,9 +1,13 @@ """Code regarding samples""" +import os +import sys import csv +import uuid from pathlib import Path from typing import Iterator import MySQLdb as mdb +from redis import Redis from MySQLdb.cursors import DictCursor from flask import ( flash, @@ -16,9 +20,13 @@ from flask import ( from quality_control.parsing import take -from .files import save_file -from .dbinsert import species_by_id, groups_by_species -from .db_utils import with_db_connection, database_connection +from qc_app import jobs +from qc_app.files import save_file +from qc_app.dbinsert import species_by_id, groups_by_species +from qc_app.db_utils import ( + with_db_connection, + database_connection, + with_redis_connection) samples = Blueprint("samples", __name__) @@ -141,6 +149,7 @@ def save_samples_data(conn: mdb.Connection, file_data: Iterator[dict]): """Save the samples to DB.""" data = ({**row, "SpeciesId": speciesid} for row in file_data) + total = 0 with conn.cursor() as cursor: while True: batch = take(data, 5000) @@ -152,6 +161,8 @@ def save_samples_data(conn: mdb.Connection, " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s" ") ON DUPLICATE KEY UPDATE Name=Name", batch) + total += len(batch) + print(f"\tSaved {total} samples total so far.") def cross_reference_samples(conn: mdb.Connection, species_id: int, @@ -162,7 +173,8 @@ def cross_reference_samples(conn: mdb.Connection, cursor.execute( "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s", (population_id,)) - last_order_id = cursor.fetchone()["loid"] + last_order_id = (cursor.fetchone()["loid"] or 10) + total = 0 while True: batch = take(strain_names, 5000) if len(batch) == 0: @@ -192,6 +204,24 @@ def cross_reference_samples(conn: mdb.Connection, ")", params) last_order_id += (len(params) * 10) + total += len(batch) + print(f"\t{total} total samples cross-referenced to the population " + "so far.") + +def build_sample_upload_job(# pylint: disable=[too-many-arguments] + speciesid: int, + populationid: int, + samplesfile: Path, + separator: str, + firstlineheading: bool, + quotechar: str): + """Define the async command to run the actual samples data upload.""" + return [ + sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"], + str(speciesid), str(populationid), str(samplesfile.absolute()), + separator, f"--redisuri={app.config['REDIS_URL']}", + f"--quotechar={quotechar}" + ] + (["--firstlineheading"] if firstlineheading else []) @samples.route("/upload/samples", methods=["POST"]) def upload_samples(): @@ -226,14 +256,67 @@ def upload_samples(): flash("You need to provide a separator character.", "alert-error") return samples_uploads_page - save_samples_data( - conn, - species["SpeciesId"], - read_samples_file(samples_file, separator, firstlineheading)) - cross_reference_samples( - conn, - species["SpeciesId"], - population["InbredSetId"], - (row["Name"] for row in read_samples_file(samples_file, separator, firstlineheading))) - - return "SUCCESS: Respond with a better UI than this." + quotechar = (request.form.get("field_delimiter", '"') or '"') + + redisuri = app.config["REDIS_URL"] + with Redis.from_url(redisuri, decode_responses=True) as rconn: + the_job = jobs.launch_job( + jobs.initialise_job( + rconn, + str(uuid.uuid4()), + build_sample_upload_job( + species["SpeciesId"], + population["InbredSetId"], + samples_file, + separator, + firstlineheading, + quotechar), + "samples_upload", + app.config["JOBS_TTL_SECONDS"], + {"job_name": f"Samples Upload: {samples_file.name}"}), + redisuri, + f"{app.config['UPLOAD_FOLDER']}/job_errors") + return redirect(url_for( + "samples.upload_status", job_id=the_job["job_id"])) + +@samples.route("/upload/status/", methods=["GET"]) +def upload_status(job_id: uuid.UUID): + """Check on the status of a samples upload job.""" + job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id))) + if job: + status = job["status"] + if status == "success": + return render_template("samples/upload-success.html", job=job) + + if status == "error": + return redirect(url_for("samples.upload_failure", job_id=job_id)) + + error_filename = Path(jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")) + if error_filename.exists(): + stat = os.stat(error_filename) + if stat.st_size > 0: + return redirect(url_for( + "samples.upload_failure", job_id=job_id)) + + return render_template( + "samples/upload-progress.html", + job=job) # maybe also handle this? + + return render_template("no_such_job.html", job_id=job_id), 400 + +@samples.route("/upload/failure/", methods=["GET"]) +def upload_failure(job_id: uuid.UUID): + """Display the errors of the samples upload failure.""" + job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id))) + if not bool(job): + return render_template("no_such_job.html", job_id=job_id), 400 + + error_filename = Path(jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")) + if error_filename.exists(): + stat = os.stat(error_filename) + if stat.st_size > 0: + return render_template("worker_failure.html", job_id=job_id) + + return render_template("samples/upload-failure.html", job=job) -- cgit v1.2.3