aboutsummaryrefslogtreecommitdiff
path: root/uploader/samples
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/samples')
-rw-r--r--uploader/samples/__init__.py1
-rw-r--r--uploader/samples/models.py104
-rw-r--r--uploader/samples/views.py280
3 files changed, 385 insertions, 0 deletions
diff --git a/uploader/samples/__init__.py b/uploader/samples/__init__.py
new file mode 100644
index 0000000..1bd6d2d
--- /dev/null
+++ b/uploader/samples/__init__.py
@@ -0,0 +1 @@
+"""Samples package. Handle samples uploads and editing."""
diff --git a/uploader/samples/models.py b/uploader/samples/models.py
new file mode 100644
index 0000000..d7d5384
--- /dev/null
+++ b/uploader/samples/models.py
@@ -0,0 +1,104 @@
+"""Functions for handling samples."""
+import csv
+from typing import Iterator
+
+import MySQLdb as mdb
+from MySQLdb.cursors import DictCursor
+
+from functional_tools import take
+
+def samples_by_species_and_population(
+ conn: mdb.Connection,
+ species_id: int,
+ population_id: int
+) -> tuple[dict, ...]:
+ """Fetch the samples by their species and population."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT iset.InbredSetId, s.* FROM InbredSet AS iset "
+ "INNER JOIN StrainXRef AS sxr ON iset.InbredSetId=sxr.InbredSetId "
+ "INNER JOIN Strain AS s ON sxr.StrainId=s.Id "
+ "WHERE s.SpeciesId=%(species_id)s "
+ "AND iset.InbredSetId=%(population_id)s",
+ {"species_id": species_id, "population_id": population_id})
+ return tuple(cursor.fetchall())
+
+
+def read_samples_file(filepath, separator: str, firstlineheading: bool, **kwargs) -> Iterator[dict]:
+ """Read the samples file."""
+ with open(filepath, "r", encoding="utf-8") as inputfile:
+ reader = csv.DictReader(
+ inputfile,
+ fieldnames=(
+ None if firstlineheading
+ else ("Name", "Name2", "Symbol", "Alias")),
+ delimiter=separator,
+ quotechar=kwargs.get("quotechar", '"'))
+ for row in reader:
+ yield row
+
+
+def save_samples_data(conn: mdb.Connection,
+ speciesid: int,
+ file_data: Iterator[dict]):
+ """Save the samples to DB."""
+ data = ({**row, "SpeciesId": speciesid} for row in file_data)
+ total = 0
+ with conn.cursor() as cursor:
+ while True:
+ batch = take(data, 5000)
+ if len(batch) == 0:
+ break
+ cursor.executemany(
+ "INSERT INTO Strain(Name, Name2, SpeciesId, Symbol, Alias) "
+ "VALUES("
+ " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s"
+ ") ON DUPLICATE KEY UPDATE Name=Name",
+ batch)
+ total += len(batch)
+ print(f"\tSaved {total} samples total so far.")
+
+
+def cross_reference_samples(conn: mdb.Connection,
+ species_id: int,
+ population_id: int,
+ strain_names: Iterator[str]):
+ """Link samples to their population."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s",
+ (population_id,))
+ last_order_id = (cursor.fetchone()["loid"] or 10)
+ total = 0
+ while True:
+ batch = take(strain_names, 5000)
+ if len(batch) == 0:
+ break
+ params_str = ", ".join(["%s"] * len(batch))
+ ## This query is slow -- investigate.
+ cursor.execute(
+ "SELECT s.Id FROM Strain AS s LEFT JOIN StrainXRef AS sx "
+ "ON s.Id = sx.StrainId WHERE s.SpeciesId=%s AND s.Name IN "
+ f"({params_str}) AND sx.StrainId IS NULL",
+ (species_id,) + tuple(batch))
+ strain_ids = (sid["Id"] for sid in cursor.fetchall())
+ params = tuple({
+ "pop_id": population_id,
+ "strain_id": strain_id,
+ "order_id": last_order_id + (order_id * 10),
+ "mapping": "N",
+ "pedigree": None
+ } for order_id, strain_id in enumerate(strain_ids, start=1))
+ cursor.executemany(
+ "INSERT INTO StrainXRef( "
+ " InbredSetId, StrainId, OrderId, Used_for_mapping, PedigreeStatus"
+ ")"
+ "VALUES ("
+ " %(pop_id)s, %(strain_id)s, %(order_id)s, %(mapping)s, "
+ " %(pedigree)s"
+ ")",
+ params)
+ last_order_id += (len(params) * 10)
+ total += len(batch)
+ print(f"\t{total} total samples cross-referenced to the population "
+ "so far.")
diff --git a/uploader/samples/views.py b/uploader/samples/views.py
new file mode 100644
index 0000000..ed79101
--- /dev/null
+++ b/uploader/samples/views.py
@@ -0,0 +1,280 @@
+"""Code regarding samples"""
+import os
+import sys
+import uuid
+from pathlib import Path
+
+from redis import Redis
+from flask import (flash,
+ request,
+ url_for,
+ redirect,
+ Blueprint,
+ current_app as app)
+
+from uploader import jobs
+from uploader.files import save_file
+from uploader.ui import make_template_renderer
+from uploader.authorisation import require_login
+from uploader.request_checks import with_population
+from uploader.input_validation import is_integer_input
+from uploader.datautils import safe_int, order_by_family, enumerate_sequence
+from uploader.population.models import population_by_id, populations_by_species
+from uploader.db_utils import (with_db_connection,
+ database_connection,
+ with_redis_connection)
+from uploader.species.models import (all_species,
+ species_by_id,
+ order_species_by_family)
+
+from .models import samples_by_species_and_population
+
+samplesbp = Blueprint("samples", __name__)
+render_template = make_template_renderer("samples")
+
+@samplesbp.route("/samples", methods=["GET"])
+@require_login
+def index():
+ """Direct entry-point for uploading/handling the samples."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ if not bool(request.args.get("species_id")):
+ return render_template(
+ "samples/index.html",
+ species=order_species_by_family(all_species(conn)),
+ activelink="samples")
+ species = species_by_id(conn, request.args.get("species_id"))
+ if not bool(species):
+ flash("No such species!", "alert-danger")
+ return redirect(url_for("species.populations.samples.index"))
+ return redirect(url_for("species.populations.samples.select_population",
+ species_id=species["SpeciesId"]))
+
+
+@samplesbp.route("<int:species_id>/samples/select-population", methods=["GET"])
+@require_login
+def select_population(species_id: int):
+ """Select the population to use for the samples."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ species = species_by_id(conn, species_id)
+ if not bool(species):
+ flash("Invalid species!", "alert-danger")
+ return redirect(url_for("species.populations.samples.index"))
+
+ if not bool(request.args.get("population_id")):
+ return render_template("samples/select-population.html",
+ species=species,
+ populations=order_by_family(
+ populations_by_species(
+ conn,
+ species_id),
+ order_key="FamilyOrder"),
+ activelink="samples")
+
+ population = population_by_id(conn, request.args.get("population_id"))
+ if not bool(population):
+ flash("Population not found!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.samples.select_population",
+ species_id=species_id))
+
+ return redirect(url_for("species.populations.samples.list_samples",
+ species_id=species_id,
+ population_id=population["Id"]))
+
+@samplesbp.route("<int:species_id>/populations/<int:population_id>/samples")
+@require_login
+def list_samples(species_id: int, population_id: int):
+ """
+ List the samples in a particular population and give the ability to upload
+ new ones.
+ """
+ with database_connection(app.config["SQL_URI"]) as conn:
+ species = species_by_id(conn, species_id)
+ if not bool(species):
+ flash("Invalid species!", "alert-danger")
+ return redirect(url_for("species.populations.samples.index"))
+
+ population = population_by_id(conn, population_id)
+ if not bool(population):
+ flash("Population not found!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.samples.select_population",
+ species_id=species_id))
+
+ all_samples = enumerate_sequence(samples_by_species_and_population(
+ conn, species_id, population_id))
+ total_samples = len(all_samples)
+ offset = max(safe_int(request.args.get("from") or 0), 0)
+ count = int(request.args.get("count") or 20)
+ return render_template("samples/list-samples.html",
+ species=species,
+ population=population,
+ samples=all_samples[offset:offset+count],
+ offset=offset,
+ count=count,
+ total_samples=total_samples,
+ activelink="list-samples")
+
+
+def build_sample_upload_job(# pylint: disable=[too-many-arguments]
+ speciesid: int,
+ populationid: int,
+ samplesfile: Path,
+ separator: str,
+ firstlineheading: bool,
+ quotechar: str):
+ """Define the async command to run the actual samples data upload."""
+ return [
+ sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"],
+ str(speciesid), str(populationid), str(samplesfile.absolute()),
+ separator, f"--redisuri={app.config['REDIS_URL']}",
+ f"--quotechar={quotechar}"
+ ] + (["--firstlineheading"] if firstlineheading else [])
+
+
+@samplesbp.route("<int:species_id>/populations/<int:population_id>/upload-samples",
+ methods=["GET", "POST"])
+@require_login
+def upload_samples(species_id: int, population_id: int):#pylint: disable=[too-many-return-statements]
+ """Upload the samples."""
+ samples_uploads_page = redirect(url_for(
+ "species.populations.samples.upload_samples",
+ species_id=species_id,
+ population_id=population_id))
+ if not is_integer_input(species_id):
+ flash("You did not provide a valid species. Please select one to "
+ "continue.",
+ "alert-danger")
+ return redirect(url_for("expression-data.samples.select_species"))
+ species = with_db_connection(lambda conn: species_by_id(conn, species_id))
+ if not bool(species):
+ flash("Species with given ID was not found.", "alert-danger")
+ return redirect(url_for("expression-data.samples.select_species"))
+
+ if not is_integer_input(population_id):
+ flash("You did not provide a valid population. Please select one "
+ "to continue.",
+ "alert-danger")
+ return redirect(url_for("species.populations.samples.select_population",
+ species_id=species_id),
+ code=307)
+ population = with_db_connection(
+ lambda conn: population_by_id(conn, int(population_id)))
+ if not bool(population):
+ flash("Invalid grouping/population!", "alert-error")
+ return redirect(url_for("species.populations.samples.select_population",
+ species_id=species_id),
+ code=307)
+
+ if request.method == "GET" or request.files.get("samples_file") is None:
+ return render_template("samples/upload-samples.html",
+ species=species,
+ population=population)
+
+ try:
+ samples_file = save_file(request.files["samples_file"],
+ Path(app.config["UPLOAD_FOLDER"]))
+ except AssertionError:
+ flash("You need to provide a file with the samples data.",
+ "alert-error")
+ return samples_uploads_page
+
+ firstlineheading = (request.form.get("first_line_heading") == "on")
+
+ separator = request.form.get("separator", ",")
+ if separator == "other":
+ separator = request.form.get("other_separator", ",")
+ if not bool(separator):
+ flash("You need to provide a separator character.", "alert-error")
+ return samples_uploads_page
+
+ quotechar = (request.form.get("field_delimiter", '"') or '"')
+
+ redisuri = app.config["REDIS_URL"]
+ with Redis.from_url(redisuri, decode_responses=True) as rconn:
+ #TODO: Add a QC step here — what do we check?
+ # 1. Does any sample in the uploaded file exist within the database?
+ # If yes, what is/are its/their species and population?
+ # 2. If yes 1. above, provide error with notes on which species and
+ # populations already own the samples.
+ the_job = jobs.launch_job(
+ jobs.initialise_job(
+ rconn,
+ jobs.jobsnamespace(),
+ str(uuid.uuid4()),
+ build_sample_upload_job(
+ species["SpeciesId"],
+ population["InbredSetId"],
+ samples_file,
+ separator,
+ firstlineheading,
+ quotechar),
+ "samples_upload",
+ app.config["JOBS_TTL_SECONDS"],
+ {"job_name": f"Samples Upload: {samples_file.name}"}),
+ redisuri,
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ return redirect(url_for(
+ "species.populations.samples.upload_status",
+ species_id=species_id,
+ population_id=population_id,
+ job_id=the_job["jobid"]))
+
+
+@samplesbp.route("<int:species_id>/populations/<int:population_id>/"
+ "upload-samples/status/<uuid:job_id>",
+ methods=["GET"])
+@require_login
+@with_population(species_redirect_uri="species.populations.samples.index",
+ redirect_uri="species.populations.samples.select_population")
+def upload_status(species: dict, population: dict, job_id: uuid.UUID, **kwargs):# pylint: disable=[unused-argument]
+ """Check on the status of a samples upload job."""
+ job = with_redis_connection(lambda rconn: jobs.job(
+ rconn, jobs.jobsnamespace(), job_id))
+ if job:
+ status = job["status"]
+ if status == "success":
+ return render_template("samples/upload-success.html",
+ job=job,
+ species=species,
+ population=population,)
+
+ if status == "error":
+ return redirect(url_for(
+ "species.populations.samples.upload_failure", job_id=job_id))
+
+ error_filename = Path(jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+ if error_filename.exists():
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ return redirect(url_for(
+ "samples.upload_failure", job_id=job_id))
+
+ return render_template("samples/upload-progress.html",
+ species=species,
+ population=population,
+ job=job) # maybe also handle this?
+
+ return render_template("no_such_job.html",
+ job_id=job_id,
+ species=species,
+ population=population), 400
+
+@samplesbp.route("/upload/failure/<uuid:job_id>", methods=["GET"])
+@require_login
+def upload_failure(job_id: uuid.UUID):
+ """Display the errors of the samples upload failure."""
+ job = with_redis_connection(lambda rconn: jobs.job(
+ rconn, jobs.jobsnamespace(), job_id))
+ if not bool(job):
+ return render_template("no_such_job.html", job_id=job_id), 400
+
+ error_filename = Path(jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors"))
+ if error_filename.exists():
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
+ return render_template("worker_failure.html", job_id=job_id)
+
+ return render_template("samples/upload-failure.html", job=job)