about summary refs log tree commit diff
path: root/uploader/samples.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-07-25 11:07:33 -0500
committerFrederick Muriuki Muriithi2024-07-25 14:34:09 -0500
commit754e8f214b940e05298cb360ed829f5c685d55a5 (patch)
tree62c2c5b601746621f0949b38937ad232f006dee2 /uploader/samples.py
parentde9e1b9fe37928b864bea28b408de6c14d04526b (diff)
downloadgn-uploader-754e8f214b940e05298cb360ed829f5c685d55a5.tar.gz
Rename module: qc_app --> uploader
Diffstat (limited to 'uploader/samples.py')
-rw-r--r--uploader/samples.py354
1 files changed, 354 insertions, 0 deletions
diff --git a/uploader/samples.py b/uploader/samples.py
new file mode 100644
index 0000000..9c95770
--- /dev/null
+++ b/uploader/samples.py
@@ -0,0 +1,354 @@
+"""Code regarding samples"""
+import os
+import sys
+import csv
+import uuid
+from pathlib import Path
+from typing import Iterator
+
+import MySQLdb as mdb
+from redis import Redis
+from MySQLdb.cursors import DictCursor
+from flask import (
+    flash,
+    request,
+    url_for,
+    redirect,
+    Blueprint,
+    render_template,
+    current_app as app)
+
+from functional_tools import take
+
+from uploader import jobs
+from uploader.files import save_file
+from uploader.input_validation import is_integer_input
+from uploader.db_utils import (
+    with_db_connection,
+    database_connection,
+    with_redis_connection)
+from uploader.db import (
+    species_by_id,
+    save_population,
+    population_by_id,
+    populations_by_species,
+    species as fetch_species)
+
+samples = Blueprint("samples", __name__)
+
+@samples.route("/upload/species", methods=["GET", "POST"])
+def select_species():
+    """Select the species."""
+    if request.method == "GET":
+        return render_template("samples/select-species.html",
+                               species=with_db_connection(fetch_species))
+
+    index_page = redirect(url_for("entry.upload_file"))
+    species_id = request.form.get("species_id")
+    if bool(species_id):
+        species_id = int(species_id)
+        species = with_db_connection(
+            lambda conn: species_by_id(conn, species_id))
+        if bool(species):
+            return redirect(url_for(
+                "samples.select_population", species_id=species_id))
+        flash("Invalid species selected!", "alert-error")
+    flash("You need to select a species", "alert-error")
+    return index_page
+
+@samples.route("/upload/species/<int:species_id>/create-population",
+               methods=["POST"])
+def create_population(species_id: int):
+    """Create new grouping/population."""
+    if not is_integer_input(species_id):
+        flash("You did not provide a valid species. Please select one to "
+              "continue.",
+              "alert-danger")
+        return redirect(url_for("samples.select_species"))
+    species = with_db_connection(lambda conn: species_by_id(conn, species_id))
+    if not bool(species):
+        flash("Species with given ID was not found.", "alert-danger")
+        return redirect(url_for("samples.select_species"))
+
+    species_page = redirect(url_for("samples.select_species"), code=307)
+    with database_connection(app.config["SQL_URI"]) as conn:
+        species = species_by_id(conn, species_id)
+        pop_name = request.form.get("inbredset_name", "").strip()
+        pop_fullname = request.form.get("inbredset_fullname", "").strip()
+
+        if not bool(species):
+            flash("Invalid species!", "alert-error error-create-population")
+            return species_page
+        if (not bool(pop_name)) or (not bool(pop_fullname)):
+            flash("You *MUST* provide a grouping/population name",
+                  "alert-error error-create-population")
+            return species_page
+
+        pop = save_population(conn, {
+            "SpeciesId": species["SpeciesId"],
+            "Name": pop_name,
+            "InbredSetName": pop_fullname,
+            "FullName": pop_fullname,
+            "Family": request.form.get("inbredset_family") or None,
+            "Description": request.form.get("description") or None
+        })
+
+        flash("Grouping/Population created successfully.", "alert-success")
+        return redirect(url_for("samples.upload_samples",
+                                species_id=species_id,
+                                population_id=pop["population_id"]))
+
+@samples.route("/upload/species/<int:species_id>/population",
+               methods=["GET", "POST"])
+def select_population(species_id: int):
+    """Select from existing groupings/populations."""
+    if not is_integer_input(species_id):
+        flash("You did not provide a valid species. Please select one to "
+              "continue.",
+              "alert-danger")
+        return redirect(url_for("samples.select_species"))
+    species = with_db_connection(lambda conn: species_by_id(conn, species_id))
+    if not bool(species):
+        flash("Species with given ID was not found.", "alert-danger")
+        return redirect(url_for("samples.select_species"))
+
+    if request.method == "GET":
+        return render_template(
+            "samples/select-population.html",
+            species=species,
+            populations=with_db_connection(
+                lambda conn: populations_by_species(conn, species_id)))
+
+    population_page = redirect(url_for(
+        "samples.select_population", species_id=species_id), code=307)
+    _population_id = request.form.get("inbredset_id")
+    if not is_integer_input(_population_id):
+        flash("You did not provide a valid population. Please select one to "
+              "continue.",
+              "alert-danger")
+        return population_page
+    population = with_db_connection(
+        lambda conn: population_by_id(conn, _population_id))
+    if not bool(population):
+        flash("Invalid grouping/population!",
+              "alert-error error-select-population")
+        return population_page
+
+    return redirect(url_for("samples.upload_samples",
+                            species_id=species_id,
+                            population_id=_population_id),
+                    code=307)
+
+def read_samples_file(filepath, separator: str, firstlineheading: bool, **kwargs) -> Iterator[dict]:
+    """Read the samples file."""
+    with open(filepath, "r", encoding="utf-8") as inputfile:
+        reader = csv.DictReader(
+            inputfile,
+            fieldnames=(
+                None if firstlineheading
+                else ("Name", "Name2", "Symbol", "Alias")),
+            delimiter=separator,
+            quotechar=kwargs.get("quotechar", '"'))
+        for row in reader:
+            yield row
+
+def save_samples_data(conn: mdb.Connection,
+                      speciesid: int,
+                      file_data: Iterator[dict]):
+    """Save the samples to DB."""
+    data = ({**row, "SpeciesId": speciesid} for row in file_data)
+    total = 0
+    with conn.cursor() as cursor:
+        while True:
+            batch = take(data, 5000)
+            if len(batch) == 0:
+                break
+            cursor.executemany(
+                "INSERT INTO Strain(Name, Name2, SpeciesId, Symbol, Alias) "
+                "VALUES("
+                "    %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s"
+                ") ON DUPLICATE KEY UPDATE Name=Name",
+                batch)
+            total += len(batch)
+            print(f"\tSaved {total} samples total so far.")
+
+def cross_reference_samples(conn: mdb.Connection,
+                            species_id: int,
+                            population_id: int,
+                            strain_names: Iterator[str]):
+    """Link samples to their population."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(
+            "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s",
+            (population_id,))
+        last_order_id = (cursor.fetchone()["loid"] or 10)
+        total = 0
+        while True:
+            batch = take(strain_names, 5000)
+            if len(batch) == 0:
+                break
+            params_str = ", ".join(["%s"] * len(batch))
+            ## This query is slow -- investigate.
+            cursor.execute(
+                "SELECT s.Id FROM Strain AS s LEFT JOIN StrainXRef AS sx "
+                "ON s.Id = sx.StrainId WHERE s.SpeciesId=%s AND s.Name IN "
+                f"({params_str}) AND sx.StrainId IS NULL",
+                (species_id,) + tuple(batch))
+            strain_ids = (sid["Id"] for sid in cursor.fetchall())
+            params = tuple({
+                "pop_id": population_id,
+                "strain_id": strain_id,
+                "order_id": last_order_id + (order_id * 10),
+                "mapping": "N",
+                "pedigree": None
+            } for order_id, strain_id in enumerate(strain_ids, start=1))
+            cursor.executemany(
+                "INSERT INTO StrainXRef( "
+                "  InbredSetId, StrainId, OrderId, Used_for_mapping, PedigreeStatus"
+                ")"
+                "VALUES ("
+                "  %(pop_id)s, %(strain_id)s, %(order_id)s, %(mapping)s, "
+                "  %(pedigree)s"
+                ")",
+                params)
+            last_order_id += (len(params) * 10)
+            total += len(batch)
+            print(f"\t{total} total samples cross-referenced to the population "
+                  "so far.")
+
+def build_sample_upload_job(# pylint: disable=[too-many-arguments]
+        speciesid: int,
+        populationid: int,
+        samplesfile: Path,
+        separator: str,
+        firstlineheading: bool,
+        quotechar: str):
+    """Define the async command to run the actual samples data upload."""
+    return [
+        sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"],
+        str(speciesid), str(populationid), str(samplesfile.absolute()),
+        separator, f"--redisuri={app.config['REDIS_URL']}",
+        f"--quotechar={quotechar}"
+    ] + (["--firstlineheading"] if firstlineheading else [])
+
+@samples.route("/upload/species/<int:species_id>/populations/<int:population_id>/samples",
+               methods=["GET", "POST"])
+def upload_samples(species_id: int, population_id: int):#pylint: disable=[too-many-return-statements]
+    """Upload the samples."""
+    samples_uploads_page = redirect(url_for("samples.upload_samples",
+                                            species_id=species_id,
+                                            population_id=population_id))
+    if not is_integer_input(species_id):
+        flash("You did not provide a valid species. Please select one to "
+              "continue.",
+              "alert-danger")
+        return redirect(url_for("samples.select_species"))
+    species = with_db_connection(lambda conn: species_by_id(conn, species_id))
+    if not bool(species):
+        flash("Species with given ID was not found.", "alert-danger")
+        return redirect(url_for("samples.select_species"))
+
+    if not is_integer_input(population_id):
+        flash("You did not provide a valid population. Please select one "
+              "to continue.",
+              "alert-danger")
+        return redirect(url_for("samples.select_population",
+                                species_id=species_id),
+                        code=307)
+    population = with_db_connection(
+        lambda conn: population_by_id(conn, int(population_id)))
+    if not bool(population):
+        flash("Invalid grouping/population!", "alert-error")
+        return redirect(url_for("samples.select_population",
+                                species_id=species_id),
+                        code=307)
+
+    if request.method == "GET" or request.files.get("samples_file") is None:
+        return render_template("samples/upload-samples.html",
+                               species=species,
+                               population=population)
+
+    try:
+        samples_file = save_file(request.files["samples_file"],
+                                 Path(app.config["UPLOAD_FOLDER"]))
+    except AssertionError:
+        flash("You need to provide a file with the samples data.",
+              "alert-error")
+        return samples_uploads_page
+
+    firstlineheading = (request.form.get("first_line_heading") == "on")
+
+    separator = request.form.get("separator", ",")
+    if separator == "other":
+        separator = request.form.get("other_separator", ",")
+    if not bool(separator):
+        flash("You need to provide a separator character.", "alert-error")
+        return samples_uploads_page
+
+    quotechar = (request.form.get("field_delimiter", '"') or '"')
+
+    redisuri = app.config["REDIS_URL"]
+    with Redis.from_url(redisuri, decode_responses=True) as rconn:
+        the_job = jobs.launch_job(
+            jobs.initialise_job(
+                rconn,
+                jobs.jobsnamespace(),
+                str(uuid.uuid4()),
+                build_sample_upload_job(
+                    species["SpeciesId"],
+                    population["InbredSetId"],
+                    samples_file,
+                    separator,
+                    firstlineheading,
+                    quotechar),
+                "samples_upload",
+                app.config["JOBS_TTL_SECONDS"],
+                {"job_name": f"Samples Upload: {samples_file.name}"}),
+            redisuri,
+            f"{app.config['UPLOAD_FOLDER']}/job_errors")
+        return redirect(url_for(
+            "samples.upload_status", job_id=the_job["jobid"]))
+
+@samples.route("/upload/status/<uuid:job_id>", methods=["GET"])
+def upload_status(job_id: uuid.UUID):
+    """Check on the status of a samples upload job."""
+    job = with_redis_connection(lambda rconn: jobs.job(
+        rconn, jobs.jobsnamespace(), job_id))
+    if job:
+        status = job["status"]
+        if status == "success":
+            return render_template("samples/upload-success.html", job=job)
+
+        if status == "error":
+            return redirect(url_for("samples.upload_failure", job_id=job_id))
+
+        error_filename = Path(jobs.error_filename(
+            job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+        if error_filename.exists():
+            stat = os.stat(error_filename)
+            if stat.st_size > 0:
+                return redirect(url_for(
+                    "samples.upload_failure", job_id=job_id))
+
+        return render_template(
+            "samples/upload-progress.html",
+            job=job) # maybe also handle this?
+
+    return render_template("no_such_job.html", job_id=job_id), 400
+
+@samples.route("/upload/failure/<uuid:job_id>", methods=["GET"])
+def upload_failure(job_id: uuid.UUID):
+    """Display the errors of the samples upload failure."""
+    job = with_redis_connection(lambda rconn: jobs.job(
+        rconn, jobs.jobsnamespace(), job_id))
+    if not bool(job):
+        return render_template("no_such_job.html", job_id=job_id), 400
+
+    error_filename = Path(jobs.error_filename(
+        job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+    if error_filename.exists():
+        stat = os.stat(error_filename)
+        if stat.st_size > 0:
+            return render_template("worker_failure.html", job_id=job_id)
+
+    return render_template("samples/upload-failure.html", job=job)