aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-12-18 12:01:38 +0300
committerFrederick Muriuki Muriithi2023-12-18 12:01:38 +0300
commitd4e45e5bf9877957c36b8e0a537ba3819a674614 (patch)
tree1850dbe6d2733497501db09e22f9f53daa0f9943
parent096ab99a2d961e864f340c39252b4c8eecc72191 (diff)
downloadgn-uploader-d4e45e5bf9877957c36b8e0a537ba3819a674614.tar.gz
Samples: Hook-up external async script to upload the samples.
-rw-r--r--qc_app/db_utils.py9
-rw-r--r--qc_app/jobs.py6
-rw-r--r--qc_app/samples.py113
-rw-r--r--qc_app/static/css/styles.css3
-rw-r--r--scripts/insert_samples.py12
5 files changed, 118 insertions, 25 deletions
diff --git a/qc_app/db_utils.py b/qc_app/db_utils.py
index 75b6b73..ef26398 100644
--- a/qc_app/db_utils.py
+++ b/qc_app/db_utils.py
@@ -2,10 +2,11 @@
import logging
import traceback
import contextlib
+from urllib.parse import urlparse
from typing import Any, Tuple, Optional, Iterator, Callable
-from urllib.parse import urlparse
import MySQLdb as mdb
+from redis import Redis
from flask import current_app as app
def parse_db_url(db_url) -> Tuple:
@@ -37,3 +38,9 @@ def with_db_connection(func: Callable[[mdb.Connection], Any]) -> Any:
"""Call `func` with a MySQDdb database connection."""
with database_connection(app.config["SQL_URI"]) as conn:
return func(conn)
+
+def with_redis_connection(func: Callable[[Redis], Any]) -> Any:
+ """Call `func` with a redis connection."""
+ redisuri = app.config["REDIS_URL"]
+ with Redis.from_url(redisuri, decode_responses=True) as rconn:
+ return func(rconn)
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index 10ba832..c5bf5e5 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -12,7 +12,7 @@ def error_filename(job_id, error_dir):
"Compute the path of the file where errors will be dumped."
return f"{error_dir}/job_{job_id}.error"
-def __init_job__(# pylint: disable=[too-many-arguments]
+def initialise_job(# pylint: disable=[too-many-arguments]
redis_conn: Redis, job_id: str, command: list, job_type: str,
ttl_seconds: int, extra_meta: dict) -> dict:
"Initialise a job 'object' and put in on redis"
@@ -33,7 +33,7 @@ def build_file_verification_job(
sys.executable, "-m", "scripts.validate_file", filetype, filepath, redisurl,
job_id
]
- return __init_job__(
+ return initialise_job(
redis_conn, job_id, command, "file-verification", ttl_seconds, {
"filetype": filetype,
"filename": os.path.basename(filepath), "percent": 0
@@ -48,7 +48,7 @@ def data_insertion_job(# pylint: disable=[too-many-arguments]
sys.executable, "-m", "scripts.insert_data", filetype, filepath,
speciesid, platformid, datasetid, databaseuri, redisuri
]
- return __init_job__(
+ return initialise_job(
redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, {
"filename": os.path.basename(filepath), "filetype": filetype,
"totallines": totallines
diff --git a/qc_app/samples.py b/qc_app/samples.py
index 88a0fde..e1e88bd 100644
--- a/qc_app/samples.py
+++ b/qc_app/samples.py
@@ -1,9 +1,13 @@
"""Code regarding samples"""
+import os
+import sys
import csv
+import uuid
from pathlib import Path
from typing import Iterator
import MySQLdb as mdb
+from redis import Redis
from MySQLdb.cursors import DictCursor
from flask import (
flash,
@@ -16,9 +20,13 @@ from flask import (
from quality_control.parsing import take
-from .files import save_file
-from .dbinsert import species_by_id, groups_by_species
-from .db_utils import with_db_connection, database_connection
+from qc_app import jobs
+from qc_app.files import save_file
+from qc_app.dbinsert import species_by_id, groups_by_species
+from qc_app.db_utils import (
+ with_db_connection,
+ database_connection,
+ with_redis_connection)
samples = Blueprint("samples", __name__)
@@ -141,6 +149,7 @@ def save_samples_data(conn: mdb.Connection,
file_data: Iterator[dict]):
"""Save the samples to DB."""
data = ({**row, "SpeciesId": speciesid} for row in file_data)
+ total = 0
with conn.cursor() as cursor:
while True:
batch = take(data, 5000)
@@ -152,6 +161,8 @@ def save_samples_data(conn: mdb.Connection,
" %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s"
") ON DUPLICATE KEY UPDATE Name=Name",
batch)
+ total += len(batch)
+ print(f"\tSaved {total} samples total so far.")
def cross_reference_samples(conn: mdb.Connection,
species_id: int,
@@ -162,7 +173,8 @@ def cross_reference_samples(conn: mdb.Connection,
cursor.execute(
"SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s",
(population_id,))
- last_order_id = cursor.fetchone()["loid"]
+ last_order_id = (cursor.fetchone()["loid"] or 10)
+ total = 0
while True:
batch = take(strain_names, 5000)
if len(batch) == 0:
@@ -192,6 +204,24 @@ def cross_reference_samples(conn: mdb.Connection,
")",
params)
last_order_id += (len(params) * 10)
+ total += len(batch)
+ print(f"\t{total} total samples cross-referenced to the population "
+ "so far.")
+
+def build_sample_upload_job(# pylint: disable=[too-many-arguments]
+ speciesid: int,
+ populationid: int,
+ samplesfile: Path,
+ separator: str,
+ firstlineheading: bool,
+ quotechar: str):
+ """Define the async command to run the actual samples data upload."""
+ return [
+ sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"],
+ str(speciesid), str(populationid), str(samplesfile.absolute()),
+ separator, f"--redisuri={app.config['REDIS_URL']}",
+ f"--quotechar={quotechar}"
+ ] + (["--firstlineheading"] if firstlineheading else [])
@samples.route("/upload/samples", methods=["POST"])
def upload_samples():
@@ -226,14 +256,67 @@ def upload_samples():
flash("You need to provide a separator character.", "alert-error")
return samples_uploads_page
- save_samples_data(
- conn,
- species["SpeciesId"],
- read_samples_file(samples_file, separator, firstlineheading))
- cross_reference_samples(
- conn,
- species["SpeciesId"],
- population["InbredSetId"],
- (row["Name"] for row in read_samples_file(samples_file, separator, firstlineheading)))
-
- return "SUCCESS: Respond with a better UI than this."
+ quotechar = (request.form.get("field_delimiter", '"') or '"')
+
+ redisuri = app.config["REDIS_URL"]
+ with Redis.from_url(redisuri, decode_responses=True) as rconn:
+ the_job = jobs.launch_job(
+ jobs.initialise_job(
+ rconn,
+ str(uuid.uuid4()),
+ build_sample_upload_job(
+ species["SpeciesId"],
+ population["InbredSetId"],
+ samples_file,
+ separator,
+ firstlineheading,
+ quotechar),
+ "samples_upload",
+ app.config["JOBS_TTL_SECONDS"],
+ {"job_name": f"Samples Upload: {samples_file.name}"}),
+ redisuri,
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ return redirect(url_for(
+ "samples.upload_status", job_id=the_job["job_id"]))
+
+@samples.route("/upload/status/<uuid:job_id>", methods=["GET"])
+def upload_status(job_id: uuid.UUID):
+ """Check on the status of a samples upload job."""
+ job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id)))
+ if job:
+ status = job["status"]
+ if status == "success":
+ return render_template("samples/upload-success.html", job=job)
+
+ if status == "error":
+ return redirect(url_for("samples.upload_failure", job_id=job_id))
+
+ error_filename = Path(jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+ if error_filename.exists():
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ return redirect(url_for(
+ "samples.upload_failure", job_id=job_id))
+
+ return render_template(
+ "samples/upload-progress.html",
+ job=job) # maybe also handle this?
+
+ return render_template("no_such_job.html", job_id=job_id), 400
+
+@samples.route("/upload/failure/<uuid:job_id>", methods=["GET"])
+def upload_failure(job_id: uuid.UUID):
+ """Display the errors of the samples upload failure."""
+ job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id)))
+ if not bool(job):
+ return render_template("no_such_job.html", job_id=job_id), 400
+
+ error_filename = Path(jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+ if error_filename.exists():
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ return render_template("worker_failure.html", job_id=job_id)
+
+ return render_template("samples/upload-failure.html", job=job)
diff --git a/qc_app/static/css/styles.css b/qc_app/static/css/styles.css
index c9f6737..4c48a8a 100644
--- a/qc_app/static/css/styles.css
+++ b/qc_app/static/css/styles.css
@@ -170,5 +170,6 @@ form fieldset legend {
padding: 1em;
font-weight: bold;
border-radius: 0.8em;
- width: 55em;
+ width: 90%;
+ overflow: scroll;
}
diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py
index 43c6a38..6b6faf1 100644
--- a/scripts/insert_samples.py
+++ b/scripts/insert_samples.py
@@ -42,21 +42,25 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
firstlineheading: bool,
quotechar: str):
"""Insert the samples into the database."""
+ print("Checking for errors:")
species = species_by_id(conn, speciesid)
if not bool(species):
logging.error("Species with id '%s' does not exist.", str(speciesid))
return 1
+ print(f"\tSpecies with ID '{speciesid}' found")
population = population_by_id(conn, populationid)
if not bool(population):
logging.error("Population with id '%s' does not exist.",
str(populationid))
return 1
- logging.info("Inserting samples ...")
+ print(f"\tPopulations with ID '{populationid}' found")
+ print("No errors found. Continuing...")
+ print("\nInserting samples ...")
save_samples_data(
conn,
speciesid,
read_samples_file(samplesfile, separator, firstlineheading))
- logging.info("Cross-referencing samples with their populations.")
+ print("Cross-referencing samples with their populations.")
cross_reference_samples(
conn,
speciesid,
@@ -66,7 +70,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
separator,
firstlineheading,
quotechar=quotechar)))
-
+ print("Samples upload successfully completed.")
return 0
if __name__ == "__main__":
@@ -133,8 +137,6 @@ if __name__ == "__main__":
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
- print("We got here...")
- print(args)
return insert_samples(dbconn,
rconn,
args.speciesid,