aboutsummaryrefslogtreecommitdiff
path: root/qc_app/samples.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-12-18 12:01:38 +0300
committerFrederick Muriuki Muriithi2023-12-18 12:01:38 +0300
commitd4e45e5bf9877957c36b8e0a537ba3819a674614 (patch)
tree1850dbe6d2733497501db09e22f9f53daa0f9943 /qc_app/samples.py
parent096ab99a2d961e864f340c39252b4c8eecc72191 (diff)
downloadgn-uploader-d4e45e5bf9877957c36b8e0a537ba3819a674614.tar.gz
Samples: Hook-up external async script to upload the samples.
Diffstat (limited to 'qc_app/samples.py')
-rw-r--r--qc_app/samples.py113
1 files changed, 98 insertions, 15 deletions
diff --git a/qc_app/samples.py b/qc_app/samples.py
index 88a0fde..e1e88bd 100644
--- a/qc_app/samples.py
+++ b/qc_app/samples.py
@@ -1,9 +1,13 @@
"""Code regarding samples"""
+import os
+import sys
import csv
+import uuid
from pathlib import Path
from typing import Iterator
import MySQLdb as mdb
+from redis import Redis
from MySQLdb.cursors import DictCursor
from flask import (
flash,
@@ -16,9 +20,13 @@ from flask import (
from quality_control.parsing import take
-from .files import save_file
-from .dbinsert import species_by_id, groups_by_species
-from .db_utils import with_db_connection, database_connection
+from qc_app import jobs
+from qc_app.files import save_file
+from qc_app.dbinsert import species_by_id, groups_by_species
+from qc_app.db_utils import (
+ with_db_connection,
+ database_connection,
+ with_redis_connection)
samples = Blueprint("samples", __name__)
@@ -141,6 +149,7 @@ def save_samples_data(conn: mdb.Connection,
file_data: Iterator[dict]):
"""Save the samples to DB."""
data = ({**row, "SpeciesId": speciesid} for row in file_data)
+ total = 0
with conn.cursor() as cursor:
while True:
batch = take(data, 5000)
@@ -152,6 +161,8 @@ def save_samples_data(conn: mdb.Connection,
" %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s"
") ON DUPLICATE KEY UPDATE Name=Name",
batch)
+ total += len(batch)
+ print(f"\tSaved {total} samples total so far.")
def cross_reference_samples(conn: mdb.Connection,
species_id: int,
@@ -162,7 +173,8 @@ def cross_reference_samples(conn: mdb.Connection,
cursor.execute(
"SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s",
(population_id,))
- last_order_id = cursor.fetchone()["loid"]
+ last_order_id = (cursor.fetchone()["loid"] or 10)
+ total = 0
while True:
batch = take(strain_names, 5000)
if len(batch) == 0:
@@ -192,6 +204,24 @@ def cross_reference_samples(conn: mdb.Connection,
")",
params)
last_order_id += (len(params) * 10)
+ total += len(batch)
+ print(f"\t{total} total samples cross-referenced to the population "
+ "so far.")
+
+def build_sample_upload_job(# pylint: disable=[too-many-arguments]
+ speciesid: int,
+ populationid: int,
+ samplesfile: Path,
+ separator: str,
+ firstlineheading: bool,
+ quotechar: str):
+ """Define the async command to run the actual samples data upload."""
+ return [
+ sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"],
+ str(speciesid), str(populationid), str(samplesfile.absolute()),
+ separator, f"--redisuri={app.config['REDIS_URL']}",
+ f"--quotechar={quotechar}"
+ ] + (["--firstlineheading"] if firstlineheading else [])
@samples.route("/upload/samples", methods=["POST"])
def upload_samples():
@@ -226,14 +256,67 @@ def upload_samples():
flash("You need to provide a separator character.", "alert-error")
return samples_uploads_page
- save_samples_data(
- conn,
- species["SpeciesId"],
- read_samples_file(samples_file, separator, firstlineheading))
- cross_reference_samples(
- conn,
- species["SpeciesId"],
- population["InbredSetId"],
- (row["Name"] for row in read_samples_file(samples_file, separator, firstlineheading)))
-
- return "SUCCESS: Respond with a better UI than this."
+ quotechar = (request.form.get("field_delimiter", '"') or '"')
+
+ redisuri = app.config["REDIS_URL"]
+ with Redis.from_url(redisuri, decode_responses=True) as rconn:
+ the_job = jobs.launch_job(
+ jobs.initialise_job(
+ rconn,
+ str(uuid.uuid4()),
+ build_sample_upload_job(
+ species["SpeciesId"],
+ population["InbredSetId"],
+ samples_file,
+ separator,
+ firstlineheading,
+ quotechar),
+ "samples_upload",
+ app.config["JOBS_TTL_SECONDS"],
+ {"job_name": f"Samples Upload: {samples_file.name}"}),
+ redisuri,
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ return redirect(url_for(
+ "samples.upload_status", job_id=the_job["job_id"]))
+
+@samples.route("/upload/status/<uuid:job_id>", methods=["GET"])
+def upload_status(job_id: uuid.UUID):
+ """Check on the status of a samples upload job."""
+ job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id)))
+ if job:
+ status = job["status"]
+ if status == "success":
+ return render_template("samples/upload-success.html", job=job)
+
+ if status == "error":
+ return redirect(url_for("samples.upload_failure", job_id=job_id))
+
+ error_filename = Path(jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+ if error_filename.exists():
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ return redirect(url_for(
+ "samples.upload_failure", job_id=job_id))
+
+ return render_template(
+ "samples/upload-progress.html",
+ job=job) # maybe also handle this?
+
+ return render_template("no_such_job.html", job_id=job_id), 400
+
+@samples.route("/upload/failure/<uuid:job_id>", methods=["GET"])
+def upload_failure(job_id: uuid.UUID):
+ """Display the errors of the samples upload failure."""
+ job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id)))
+ if not bool(job):
+ return render_template("no_such_job.html", job_id=job_id), 400
+
+ error_filename = Path(jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+ if error_filename.exists():
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ return render_template("worker_failure.html", job_id=job_id)
+
+ return render_template("samples/upload-failure.html", job=job)