diff options
Diffstat (limited to 'uploader/expression_data/samples.py')
-rw-r--r-- | uploader/expression_data/samples.py | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/uploader/expression_data/samples.py b/uploader/expression_data/samples.py new file mode 100644 index 0000000..95b9b73 --- /dev/null +++ b/uploader/expression_data/samples.py @@ -0,0 +1,359 @@ +"""Code regarding samples""" +import os +import sys +import csv +import uuid +from pathlib import Path +from typing import Iterator + +import MySQLdb as mdb +from redis import Redis +from MySQLdb.cursors import DictCursor +from flask import ( + flash, + request, + url_for, + redirect, + Blueprint, + render_template, + current_app as app) + +from functional_tools import take + +from uploader import jobs +from uploader.files import save_file +from uploader.authorisation import require_login +from uploader.input_validation import is_integer_input +from uploader.db_utils import ( + with_db_connection, + database_connection, + with_redis_connection) +from uploader.db import ( + species_by_id, + save_population, + population_by_id, + populations_by_species, + species as fetch_species) + +samples = Blueprint("samples", __name__) + +@samples.route("/upload/species", methods=["GET", "POST"]) +@require_login +def select_species(): + """Select the species.""" + if request.method == "GET": + return render_template("samples/select-species.html", + species=with_db_connection(fetch_species)) + + index_page = redirect(url_for("expression-data.index.upload_file")) + species_id = request.form.get("species_id") + if bool(species_id): + species_id = int(species_id) + species = with_db_connection( + lambda conn: species_by_id(conn, species_id)) + if bool(species): + return redirect(url_for( + "samples.select_population", species_id=species_id)) + flash("Invalid species selected!", "alert-error") + flash("You need to select a species", "alert-error") + return index_page + +@samples.route("/upload/species/<int:species_id>/create-population", + methods=["POST"]) +@require_login +def create_population(species_id: int): + """Create new grouping/population.""" + if not is_integer_input(species_id): + flash("You did not provide a valid species. Please select one to " + "continue.", + "alert-danger") + return redirect(url_for("expression-data.samples.select_species")) + species = with_db_connection(lambda conn: species_by_id(conn, species_id)) + if not bool(species): + flash("Species with given ID was not found.", "alert-danger") + return redirect(url_for("expression-data.samples.select_species")) + + species_page = redirect(url_for("expression-data.samples.select_species"), code=307) + with database_connection(app.config["SQL_URI"]) as conn: + species = species_by_id(conn, species_id) + pop_name = request.form.get("inbredset_name", "").strip() + pop_fullname = request.form.get("inbredset_fullname", "").strip() + + if not bool(species): + flash("Invalid species!", "alert-error error-create-population") + return species_page + if (not bool(pop_name)) or (not bool(pop_fullname)): + flash("You *MUST* provide a grouping/population name", + "alert-error error-create-population") + return species_page + + pop = save_population(conn, { + "SpeciesId": species["SpeciesId"], + "Name": pop_name, + "InbredSetName": pop_fullname, + "FullName": pop_fullname, + "Family": request.form.get("inbredset_family") or None, + "Description": request.form.get("description") or None + }) + + flash("Grouping/Population created successfully.", "alert-success") + return redirect(url_for("samples.upload_samples", + species_id=species_id, + population_id=pop["population_id"])) + +@samples.route("/upload/species/<int:species_id>/population", + methods=["GET", "POST"]) +@require_login +def select_population(species_id: int): + """Select from existing groupings/populations.""" + if not is_integer_input(species_id): + flash("You did not provide a valid species. Please select one to " + "continue.", + "alert-danger") + return redirect(url_for("expression-data.samples.select_species")) + species = with_db_connection(lambda conn: species_by_id(conn, species_id)) + if not bool(species): + flash("Species with given ID was not found.", "alert-danger") + return redirect(url_for("expression-data.samples.select_species")) + + if request.method == "GET": + return render_template( + "samples/select-population.html", + species=species, + populations=with_db_connection( + lambda conn: populations_by_species(conn, species_id))) + + population_page = redirect(url_for( + "samples.select_population", species_id=species_id), code=307) + _population_id = request.form.get("inbredset_id") + if not is_integer_input(_population_id): + flash("You did not provide a valid population. Please select one to " + "continue.", + "alert-danger") + return population_page + population = with_db_connection( + lambda conn: population_by_id(conn, _population_id)) + if not bool(population): + flash("Invalid grouping/population!", + "alert-error error-select-population") + return population_page + + return redirect(url_for("samples.upload_samples", + species_id=species_id, + population_id=_population_id), + code=307) + +def read_samples_file(filepath, separator: str, firstlineheading: bool, **kwargs) -> Iterator[dict]: + """Read the samples file.""" + with open(filepath, "r", encoding="utf-8") as inputfile: + reader = csv.DictReader( + inputfile, + fieldnames=( + None if firstlineheading + else ("Name", "Name2", "Symbol", "Alias")), + delimiter=separator, + quotechar=kwargs.get("quotechar", '"')) + for row in reader: + yield row + +def save_samples_data(conn: mdb.Connection, + speciesid: int, + file_data: Iterator[dict]): + """Save the samples to DB.""" + data = ({**row, "SpeciesId": speciesid} for row in file_data) + total = 0 + with conn.cursor() as cursor: + while True: + batch = take(data, 5000) + if len(batch) == 0: + break + cursor.executemany( + "INSERT INTO Strain(Name, Name2, SpeciesId, Symbol, Alias) " + "VALUES(" + " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s" + ") ON DUPLICATE KEY UPDATE Name=Name", + batch) + total += len(batch) + print(f"\tSaved {total} samples total so far.") + +def cross_reference_samples(conn: mdb.Connection, + species_id: int, + population_id: int, + strain_names: Iterator[str]): + """Link samples to their population.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s", + (population_id,)) + last_order_id = (cursor.fetchone()["loid"] or 10) + total = 0 + while True: + batch = take(strain_names, 5000) + if len(batch) == 0: + break + params_str = ", ".join(["%s"] * len(batch)) + ## This query is slow -- investigate. + cursor.execute( + "SELECT s.Id FROM Strain AS s LEFT JOIN StrainXRef AS sx " + "ON s.Id = sx.StrainId WHERE s.SpeciesId=%s AND s.Name IN " + f"({params_str}) AND sx.StrainId IS NULL", + (species_id,) + tuple(batch)) + strain_ids = (sid["Id"] for sid in cursor.fetchall()) + params = tuple({ + "pop_id": population_id, + "strain_id": strain_id, + "order_id": last_order_id + (order_id * 10), + "mapping": "N", + "pedigree": None + } for order_id, strain_id in enumerate(strain_ids, start=1)) + cursor.executemany( + "INSERT INTO StrainXRef( " + " InbredSetId, StrainId, OrderId, Used_for_mapping, PedigreeStatus" + ")" + "VALUES (" + " %(pop_id)s, %(strain_id)s, %(order_id)s, %(mapping)s, " + " %(pedigree)s" + ")", + params) + last_order_id += (len(params) * 10) + total += len(batch) + print(f"\t{total} total samples cross-referenced to the population " + "so far.") + +def build_sample_upload_job(# pylint: disable=[too-many-arguments] + speciesid: int, + populationid: int, + samplesfile: Path, + separator: str, + firstlineheading: bool, + quotechar: str): + """Define the async command to run the actual samples data upload.""" + return [ + sys.executable, "-m", "scripts.insert_samples", app.config["SQL_URI"], + str(speciesid), str(populationid), str(samplesfile.absolute()), + separator, f"--redisuri={app.config['REDIS_URL']}", + f"--quotechar={quotechar}" + ] + (["--firstlineheading"] if firstlineheading else []) + +@samples.route("/upload/species/<int:species_id>/populations/<int:population_id>/samples", + methods=["GET", "POST"]) +@require_login +def upload_samples(species_id: int, population_id: int):#pylint: disable=[too-many-return-statements] + """Upload the samples.""" + samples_uploads_page = redirect(url_for("samples.upload_samples", + species_id=species_id, + population_id=population_id)) + if not is_integer_input(species_id): + flash("You did not provide a valid species. Please select one to " + "continue.", + "alert-danger") + return redirect(url_for("expression-data.samples.select_species")) + species = with_db_connection(lambda conn: species_by_id(conn, species_id)) + if not bool(species): + flash("Species with given ID was not found.", "alert-danger") + return redirect(url_for("expression-data.samples.select_species")) + + if not is_integer_input(population_id): + flash("You did not provide a valid population. Please select one " + "to continue.", + "alert-danger") + return redirect(url_for("samples.select_population", + species_id=species_id), + code=307) + population = with_db_connection( + lambda conn: population_by_id(conn, int(population_id))) + if not bool(population): + flash("Invalid grouping/population!", "alert-error") + return redirect(url_for("samples.select_population", + species_id=species_id), + code=307) + + if request.method == "GET" or request.files.get("samples_file") is None: + return render_template("samples/upload-samples.html", + species=species, + population=population) + + try: + samples_file = save_file(request.files["samples_file"], + Path(app.config["UPLOAD_FOLDER"])) + except AssertionError: + flash("You need to provide a file with the samples data.", + "alert-error") + return samples_uploads_page + + firstlineheading = (request.form.get("first_line_heading") == "on") + + separator = request.form.get("separator", ",") + if separator == "other": + separator = request.form.get("other_separator", ",") + if not bool(separator): + flash("You need to provide a separator character.", "alert-error") + return samples_uploads_page + + quotechar = (request.form.get("field_delimiter", '"') or '"') + + redisuri = app.config["REDIS_URL"] + with Redis.from_url(redisuri, decode_responses=True) as rconn: + the_job = jobs.launch_job( + jobs.initialise_job( + rconn, + jobs.jobsnamespace(), + str(uuid.uuid4()), + build_sample_upload_job( + species["SpeciesId"], + population["InbredSetId"], + samples_file, + separator, + firstlineheading, + quotechar), + "samples_upload", + app.config["JOBS_TTL_SECONDS"], + {"job_name": f"Samples Upload: {samples_file.name}"}), + redisuri, + f"{app.config['UPLOAD_FOLDER']}/job_errors") + return redirect(url_for( + "samples.upload_status", job_id=the_job["jobid"])) + +@samples.route("/upload/status/<uuid:job_id>", methods=["GET"]) +def upload_status(job_id: uuid.UUID): + """Check on the status of a samples upload job.""" + job = with_redis_connection(lambda rconn: jobs.job( + rconn, jobs.jobsnamespace(), job_id)) + if job: + status = job["status"] + if status == "success": + return render_template("samples/upload-success.html", job=job) + + if status == "error": + return redirect(url_for("samples.upload_failure", job_id=job_id)) + + error_filename = Path(jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")) + if error_filename.exists(): + stat = os.stat(error_filename) + if stat.st_size > 0: + return redirect(url_for( + "samples.upload_failure", job_id=job_id)) + + return render_template( + "samples/upload-progress.html", + job=job) # maybe also handle this? + + return render_template("no_such_job.html", job_id=job_id), 400 + +@samples.route("/upload/failure/<uuid:job_id>", methods=["GET"]) +def upload_failure(job_id: uuid.UUID): + """Display the errors of the samples upload failure.""" + job = with_redis_connection(lambda rconn: jobs.job( + rconn, jobs.jobsnamespace(), job_id)) + if not bool(job): + return render_template("no_such_job.html", job_id=job_id), 400 + + error_filename = Path(jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")) + if error_filename.exists(): + stat = os.stat(error_filename) + if stat.st_size > 0: + return render_template("worker_failure.html", job_id=job_id) + + return render_template("samples/upload-failure.html", job=job) |