about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--qc_app/db_utils.py9
-rw-r--r--qc_app/jobs.py6
-rw-r--r--qc_app/samples.py113
-rw-r--r--qc_app/static/css/styles.css3
-rw-r--r--scripts/insert_samples.py12
5 files changed, 118 insertions, 25 deletions
diff --git a/qc_app/db_utils.py b/qc_app/db_utils.py
index 75b6b73..ef26398 100644
--- a/qc_app/db_utils.py
+++ b/qc_app/db_utils.py
@@ -2,10 +2,11 @@
 import logging
 import traceback
 import contextlib
+from urllib.parse import urlparse
 from typing import Any, Tuple, Optional, Iterator, Callable
 
-from urllib.parse import urlparse
 import MySQLdb as mdb
+from redis import Redis
 from flask import current_app as app
 
 def parse_db_url(db_url) -> Tuple:
@@ -37,3 +38,9 @@ def with_db_connection(func: Callable[[mdb.Connection], Any]) -> Any:
     """Call `func` with a MySQDdb database connection."""
     with database_connection(app.config["SQL_URI"]) as conn:
         return func(conn)
+
+def with_redis_connection(func: Callable[[Redis], Any]) -> Any:
+    """Call `func` with a redis connection."""
+    redisuri = app.config["REDIS_URL"]
+    with Redis.from_url(redisuri, decode_responses=True) as rconn:
+        return func(rconn)
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index 10ba832..c5bf5e5 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -12,7 +12,7 @@ def error_filename(job_id, error_dir):
     "Compute the path of the file where errors will be dumped."
     return f"{error_dir}/job_{job_id}.error"
 
-def __init_job__(# pylint: disable=[too-many-arguments]
+def initialise_job(# pylint: disable=[too-many-arguments]
         redis_conn: Redis, job_id: str, command: list, job_type: str,
         ttl_seconds: int, extra_meta: dict) -> dict:
     "Initialise a job 'object' and put in on redis"
@@ -33,7 +33,7 @@ def build_file_verification_job(
         sys.executable, "-m", "scripts.validate_file", filetype, filepath, redisurl,
         job_id
     ]
-    return __init_job__(
+    return initialise_job(
         redis_conn, job_id, command, "file-verification", ttl_seconds, {
             "filetype": filetype,
             "filename": os.path.basename(filepath), "percent": 0
@@ -48,7 +48,7 @@ def data_insertion_job(# pylint: disable=[too-many-arguments]
         sys.executable, "-m", "scripts.insert_data", filetype, filepath,
         speciesid, platformid, datasetid, databaseuri, redisuri
     ]
-    return __init_job__(
+    return initialise_job(
         redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, {
             "filename": os.path.basename(filepath), "filetype": filetype,
             "totallines": totallines
diff --git a/qc_app/samples.py b/qc_app/samples.py
index 88a0fde..e1e88bd 100644
--- a/qc_app/samples.py
+++ b/qc_app/samples.py
@@ -1,9 +1,13 @@
 """Code regarding samples"""
+import os
+import sys
 import csv
+import uuid
 from pathlib import Path
 from typing import Iterator
 
 import MySQLdb as mdb
+from redis import Redis
 from MySQLdb.cursors import DictCursor
 from flask import (
     flash,
@@ -16,9 +20,13 @@ from flask import (
 
 from quality_control.parsing import take
 
-from .files import save_file
-from .dbinsert import species_by_id, groups_by_species
-from .db_utils import with_db_connection, database_connection
+from qc_app import jobs
+from qc_app.files import save_file
+from qc_app.dbinsert import species_by_id, groups_by_species
+from qc_app.db_utils import (
+    with_db_connection,
+    database_connection,
+    with_redis_connection)
 
 samples = Blueprint("samples", __name__)
 
@@ -141,6 +149,7 @@ def save_samples_data(conn: mdb.Connection,
                       file_data: Iterator[dict]):
     """Save the samples to DB."""
     data = ({**row, "SpeciesId": speciesid} for row in file_data)
+    total = 0
     with conn.cursor() as cursor:
         while True:
             batch = take(data, 5000)
@@ -152,6 +161,8 @@ def save_samples_data(conn: mdb.Connection,
                 "    %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s"
                 ") ON DUPLICATE KEY UPDATE Name=Name",
                 batch)
+            total += len(batch)
+            print(f"\tSaved {total} samples total so far.")
 
 def cross_reference_samples(conn: mdb.Connection,
                             species_id: int,
@@ -162,7 +173,8 @@ def cross_reference_samples(conn: mdb.Connection,
         cursor.execute(
             "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s",
             (population_id,))
-        last_order_id = cursor.fetchone()["loid"]
+        last_order_id = (cursor.fetchone()["loid"] or 10)
+        total = 0
         while True:
             batch = take(strain_names, 5000)
             if len(batch) == 0:
@@ -192,6 +204,24 @@ def cross_reference_samples(conn: mdb.Connection,
                 ")",
                 params)
             last_order_id += (len(params) * 10)
+            total += len(batch)
+            print(f"\t{total} total samples cross-referenced to the population "
+                  "so far.")
+
+def build_sample_upload_job(# pylint: disable=[too-many-arguments]
+        speciesid: int,
+        populationid: int,
+        samplesfile: Path,
+        separator: str,
+        firstlineheading: bool,
+        quotechar: str):
+    """Define the async command to run the actual samples data upload."""
+    return [
+        sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"],
+        str(speciesid), str(populationid), str(samplesfile.absolute()),
+        separator, f"--redisuri={app.config['REDIS_URL']}",
+        f"--quotechar={quotechar}"
+    ] + (["--firstlineheading"] if firstlineheading else [])
 
 @samples.route("/upload/samples", methods=["POST"])
 def upload_samples():
@@ -226,14 +256,67 @@ def upload_samples():
             flash("You need to provide a separator character.", "alert-error")
             return samples_uploads_page
 
-        save_samples_data(
-            conn,
-            species["SpeciesId"],
-            read_samples_file(samples_file, separator, firstlineheading))
-        cross_reference_samples(
-            conn,
-            species["SpeciesId"],
-            population["InbredSetId"],
-            (row["Name"] for row in read_samples_file(samples_file, separator, firstlineheading)))
-
-        return "SUCCESS: Respond with a better UI than this."
+        quotechar = (request.form.get("field_delimiter", '"') or '"')
+
+        redisuri = app.config["REDIS_URL"]
+        with Redis.from_url(redisuri, decode_responses=True) as rconn:
+            the_job = jobs.launch_job(
+                jobs.initialise_job(
+                    rconn,
+                    str(uuid.uuid4()),
+                    build_sample_upload_job(
+                        species["SpeciesId"],
+                        population["InbredSetId"],
+                        samples_file,
+                        separator,
+                        firstlineheading,
+                        quotechar),
+                    "samples_upload",
+                    app.config["JOBS_TTL_SECONDS"],
+                    {"job_name": f"Samples Upload: {samples_file.name}"}),
+                redisuri,
+                f"{app.config['UPLOAD_FOLDER']}/job_errors")
+            return redirect(url_for(
+                "samples.upload_status", job_id=the_job["job_id"]))
+
+@samples.route("/upload/status/<uuid:job_id>", methods=["GET"])
+def upload_status(job_id: uuid.UUID):
+    """Check on the status of a samples upload job."""
+    job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id)))
+    if job:
+        status = job["status"]
+        if status == "success":
+            return render_template("samples/upload-success.html", job=job)
+
+        if status == "error":
+            return redirect(url_for("samples.upload_failure", job_id=job_id))
+
+        error_filename = Path(jobs.error_filename(
+            job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+        if error_filename.exists():
+            stat = os.stat(error_filename)
+            if stat.st_size > 0:
+                return redirect(url_for(
+                    "samples.upload_failure", job_id=job_id))
+
+        return render_template(
+            "samples/upload-progress.html",
+            job=job) # maybe also handle this?
+
+    return render_template("no_such_job.html", job_id=job_id), 400
+
+@samples.route("/upload/failure/<uuid:job_id>", methods=["GET"])
+def upload_failure(job_id: uuid.UUID):
+    """Display the errors of the samples upload failure."""
+    job = with_redis_connection(lambda rconn: jobs.job(rconn, str(job_id)))
+    if not bool(job):
+        return render_template("no_such_job.html", job_id=job_id), 400
+
+    error_filename = Path(jobs.error_filename(
+        job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+    if error_filename.exists():
+        stat = os.stat(error_filename)
+        if stat.st_size > 0:
+            return render_template("worker_failure.html", job_id=job_id)
+
+    return render_template("samples/upload-failure.html", job=job)
diff --git a/qc_app/static/css/styles.css b/qc_app/static/css/styles.css
index c9f6737..4c48a8a 100644
--- a/qc_app/static/css/styles.css
+++ b/qc_app/static/css/styles.css
@@ -170,5 +170,6 @@ form fieldset legend {
     padding: 1em;
     font-weight: bold;
     border-radius: 0.8em;
-    width: 55em;
+    width: 90%;
+    overflow: scroll;
 }
diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py
index 43c6a38..6b6faf1 100644
--- a/scripts/insert_samples.py
+++ b/scripts/insert_samples.py
@@ -42,21 +42,25 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
                    firstlineheading: bool,
                    quotechar: str):
     """Insert the samples into the database."""
+    print("Checking for errors:")
     species = species_by_id(conn, speciesid)
     if not bool(species):
         logging.error("Species with id '%s' does not exist.", str(speciesid))
         return 1
+    print(f"\tSpecies with ID '{speciesid}' found")
     population = population_by_id(conn, populationid)
     if not bool(population):
         logging.error("Population with id '%s' does not exist.",
                       str(populationid))
         return 1
-    logging.info("Inserting samples ...")
+    print(f"\tPopulations with ID '{populationid}' found")
+    print("No errors found. Continuing...")
+    print("\nInserting samples ...")
     save_samples_data(
         conn,
         speciesid,
         read_samples_file(samplesfile, separator, firstlineheading))
-    logging.info("Cross-referencing samples with their populations.")
+    print("Cross-referencing samples with their populations.")
     cross_reference_samples(
         conn,
         speciesid,
@@ -66,7 +70,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
                            separator,
                            firstlineheading,
                            quotechar=quotechar)))
-
+    print("Samples upload successfully completed.")
     return 0
 
 if __name__ == "__main__":
@@ -133,8 +137,6 @@ if __name__ == "__main__":
 
         with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
               database_connection(args.databaseuri) as dbconn):
-            print("We got here...")
-            print(args)
             return insert_samples(dbconn,
                                   rconn,
                                   args.speciesid,