From d4e45e5bf9877957c36b8e0a537ba3819a674614 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 18 Dec 2023 12:01:38 +0300 Subject: Samples: Hook-up external async script to upload the samples. --- qc_app/db_utils.py | 9 +++- qc_app/jobs.py | 6 +-- qc_app/samples.py | 113 +++++++++++++++++++++++++++++++++++++------ qc_app/static/css/styles.css | 3 +- scripts/insert_samples.py | 12 +++-- 5 files changed, 118 insertions(+), 25 deletions(-) diff --git a/qc_app/db_utils.py b/qc_app/db_utils.py index 75b6b73..ef26398 100644 --- a/qc_app/db_utils.py +++ b/qc_app/db_utils.py @@ -2,10 +2,11 @@ import logging import traceback import contextlib +from urllib.parse import urlparse from typing import Any, Tuple, Optional, Iterator, Callable -from urllib.parse import urlparse import MySQLdb as mdb +from redis import Redis from flask import current_app as app def parse_db_url(db_url) -> Tuple: @@ -37,3 +38,9 @@ def with_db_connection(func: Callable[[mdb.Connection], Any]) -> Any: """Call `func` with a MySQDdb database connection.""" with database_connection(app.config["SQL_URI"]) as conn: return func(conn) + +def with_redis_connection(func: Callable[[Redis], Any]) -> Any: + """Call `func` with a redis connection.""" + redisuri = app.config["REDIS_URL"] + with Redis.from_url(redisuri, decode_responses=True) as rconn: + return func(rconn) diff --git a/qc_app/jobs.py b/qc_app/jobs.py index 10ba832..c5bf5e5 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -12,7 +12,7 @@ def error_filename(job_id, error_dir): "Compute the path of the file where errors will be dumped." return f"{error_dir}/job_{job_id}.error" -def __init_job__(# pylint: disable=[too-many-arguments] +def initialise_job(# pylint: disable=[too-many-arguments] redis_conn: Redis, job_id: str, command: list, job_type: str, ttl_seconds: int, extra_meta: dict) -> dict: "Initialise a job 'object' and put in on redis" @@ -33,7 +33,7 @@ def build_file_verification_job( sys.executable, "-m", "scripts.validate_file", filetype, filepath, redisurl, job_id ] - return __init_job__( + return initialise_job( redis_conn, job_id, command, "file-verification", ttl_seconds, { "filetype": filetype, "filename": os.path.basename(filepath), "percent": 0 @@ -48,7 +48,7 @@ def data_insertion_job(# pylint: disable=[too-many-arguments] sys.executable, "-m", "scripts.insert_data", filetype, filepath, speciesid, platformid, datasetid, databaseuri, redisuri ] - return __init_job__( + return initialise_job( redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, { "filename": os.path.basename(filepath), "filetype": filetype, "totallines": totallines diff --git a/qc_app/samples.py b/qc_app/samples.py index 88a0fde..e1e88bd 100644 --- a/qc_app/samples.py +++ b/qc_app/samples.py @@ -1,9 +1,13 @@ """Code regarding samples""" +import os +import sys import csv +import uuid from pathlib import Path from typing import Iterator import MySQLdb as mdb +from redis import Redis from MySQLdb.cursors import DictCursor from flask import ( flash, @@ -16,9 +20,13 @@ from flask import ( from quality_control.parsing import take -from .files import save_file -from .dbinsert import species_by_id, groups_by_species -from .db_utils import with_db_connection, database_connection +from qc_app import jobs +from qc_app.files import save_file +from qc_app.dbinsert import species_by_id, groups_by_species +from qc_app.db_utils import ( + with_db_connection, + database_connection, + with_redis_connection) samples = Blueprint("samples", __name__) @@ -141,6 +149,7 @@ def save_samples_data(conn: mdb.Connection, file_data: Iterator[dict]): """Save the samples to DB.""" data = ({**row, "SpeciesId": speciesid} for row in file_data) + total = 0 with conn.cursor() as cursor: while True: batch = take(data, 5000) @@ -152,6 +161,8 @@ def save_samples_data(conn: mdb.Connection, " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s" ") ON DUPLICATE KEY UPDATE Name=Name", batch) + total += len(batch) + print(f"\tSaved {total} samples total so far.") def cross_reference_samples(conn: mdb.Connection, species_id: int, @@ -162,7 +173,8 @@ def cross_reference_samples(conn: mdb.Connection, cursor.execute( "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s", (population_id,)) - last_order_id = cursor.fetchone()["loid"] + last_order_id = (cursor.fetchone()["loid"] or 10) + total = 0 while True: batch = take(strain_names, 5000) if len(batch) == 0: @@ -192,6 +204,24 @@ def cross_reference_samples(conn: mdb.Connection, ")", params) last_order_id += (len(params) * 10) + total += len(batch) + print(f"\t{total} total samples cross-referenced to the population " + "so far.") + +def build_sample_upload_job(# pylint: disable=[too-many-arguments] + speciesid: int, + populationid: int, + samplesfile: Path, + separator: str, + firstlineheading: bool, + quotechar: str): + """Define the async command to run the actual samples data upload.""" + return [ + sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"], + str(speciesid), str(populationid), str(samplesfile.absolute()), + separator, f"--redisuri={app.config['REDIS_URL']}", + f"--quotechar={quotechar}" + ] + (["--firstlineheading"] if firstlineheading else []) @samples.route("/upload/samples", methods=["POST"]) def upload_samples(): @@ -226,14 +256,67 @@ def upload_samples(): flash("You need to provide a separator character.", "alert-error") return samples_uploads_page - save_samples_data( - conn, - species["SpeciesId"], - read_samples_file(samples_file, separator, firstlineheading)) - cross_reference_samples( - conn, - species["SpeciesId"], - population["InbredSetId"], - (row["Name"] for row in read_samples_file(samples_file, separator, firstlineheading))) - - return "SUCCESS: Respond with a better UI than this." + quotechar = (request.form.get("field_delimiter", '"') or '"') + + redisuri = app.config["REDIS_URL"] + with Redis.from_url(redisuri, decode_responses=True) as rconn: + the_job = jobs.launch_job( + jobs.initialise_job( + rconn, + str(uuid.uuid4()), + build_sample_upload_job( + species["SpeciesId"], + population["InbredSetId"], + samples_file, + separator, + firstlineheading, + quotechar), + "samples_upload", + app.config["JOBS_TTL_SECONDS"], + {"job_name": f"Samples Upload: {samples_file.name}"}), + redisuri, + f"{app.config['UPLOAD_FOLDER']}/job_errors") + return redirect(url_for( + "samples.upload_status", job_id=the_job["job_id"])) + +@samples.route("/upload/status/", methods=["GET"]) +def upload_status(job_id: uuid.UUID): + """Check on the status of a samples upload job.""" + job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id))) + if job: + status = job["status"] + if status == "success": + return render_template("samples/upload-success.html", job=job) + + if status == "error": + return redirect(url_for("samples.upload_failure", job_id=job_id)) + + error_filename = Path(jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")) + if error_filename.exists(): + stat = os.stat(error_filename) + if stat.st_size > 0: + return redirect(url_for( + "samples.upload_failure", job_id=job_id)) + + return render_template( + "samples/upload-progress.html", + job=job) # maybe also handle this? + + return render_template("no_such_job.html", job_id=job_id), 400 + +@samples.route("/upload/failure/", methods=["GET"]) +def upload_failure(job_id: uuid.UUID): + """Display the errors of the samples upload failure.""" + job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id))) + if not bool(job): + return render_template("no_such_job.html", job_id=job_id), 400 + + error_filename = Path(jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")) + if error_filename.exists(): + stat = os.stat(error_filename) + if stat.st_size > 0: + return render_template("worker_failure.html", job_id=job_id) + + return render_template("samples/upload-failure.html", job=job) diff --git a/qc_app/static/css/styles.css b/qc_app/static/css/styles.css index c9f6737..4c48a8a 100644 --- a/qc_app/static/css/styles.css +++ b/qc_app/static/css/styles.css @@ -170,5 +170,6 @@ form fieldset legend { padding: 1em; font-weight: bold; border-radius: 0.8em; - width: 55em; + width: 90%; + overflow: scroll; } diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py index 43c6a38..6b6faf1 100644 --- a/scripts/insert_samples.py +++ b/scripts/insert_samples.py @@ -42,21 +42,25 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] firstlineheading: bool, quotechar: str): """Insert the samples into the database.""" + print("Checking for errors:") species = species_by_id(conn, speciesid) if not bool(species): logging.error("Species with id '%s' does not exist.", str(speciesid)) return 1 + print(f"\tSpecies with ID '{speciesid}' found") population = population_by_id(conn, populationid) if not bool(population): logging.error("Population with id '%s' does not exist.", str(populationid)) return 1 - logging.info("Inserting samples ...") + print(f"\tPopulations with ID '{populationid}' found") + print("No errors found. Continuing...") + print("\nInserting samples ...") save_samples_data( conn, speciesid, read_samples_file(samplesfile, separator, firstlineheading)) - logging.info("Cross-referencing samples with their populations.") + print("Cross-referencing samples with their populations.") cross_reference_samples( conn, speciesid, @@ -66,7 +70,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] separator, firstlineheading, quotechar=quotechar))) - + print("Samples upload successfully completed.") return 0 if __name__ == "__main__": @@ -133,8 +137,6 @@ if __name__ == "__main__": with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): - print("We got here...") - print(args) return insert_samples(dbconn, rconn, args.speciesid, -- cgit v1.2.3