"""Code regarding samples""" import csv from pathlib import Path from typing import Iterator import MySQLdb as mdb from MySQLdb.cursors import DictCursor from flask import ( flash, request, url_for, redirect, Blueprint, render_template, current_app as app) from quality_control.parsing import take from .files import save_file from .dbinsert import species_by_id, groups_by_species from .db_utils import with_db_connection, database_connection samples = Blueprint("samples", __name__) @samples.route("/upload/species", methods=["POST"]) def select_species(): """Select the species.""" index_page = redirect(url_for("entry.upload_file")) species_id = request.form.get("species_id") if bool(species_id): species_id = int(species_id) species = with_db_connection( lambda conn: species_by_id(conn, species_id)) if bool(species): return render_template( "samples/select-population.html", species=species, populations=groups_by_species(species_id)) flash("Invalid species selected!", "alert-error") flash("You need to select a species", "alert-error") return index_page def save_population(conn: mdb.Connection, population_details: dict) -> int: """Save the population details to the db.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT MAX(Id) AS last_id FROM InbredSet") new_id = cursor.fetchone()["last_id"] + 1 cursor.execute( "INSERT INTO InbredSet(" "Id, InbredSetId, InbredSetName, Name, SpeciesId, FullName, " "MenuOrderId, Description" ") " "VALUES (" "%(Id)s, %(InbredSetId)s, %(InbredSetName)s, %(Name)s, " "%(SpeciesId)s, %(FullName)s, %(MenuOrderId)s, %(Description)s" ")", { "Id": new_id, "InbredSetId": new_id, "MenuOrderId": 0, **population_details }) return new_id def population_by_id(conn: mdb.Connection, population_id: int) -> dict: """Get the grouping/population by id.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM InbredSet WHERE InbredSetId=%s", (population_id,)) return cursor.fetchone() @samples.route("/upload/create-population", methods=["POST"]) def create_population(): """Create new grouping/population.""" species_page = redirect(url_for("samples.select_species"), code=307) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, request.form.get("species_id")) pop_name = request.form.get("inbredset_name").strip() pop_fullname = request.form.get("inbredset_fullname").strip() if not bool(species): flash("Invalid species!", "alert-error error-create-population") return species_page if (not bool(pop_name)) or (not bool(pop_fullname)): flash("You *MUST* provide a grouping/population name", "alert-error error-create-population") return species_page pop_id = save_population(conn, { "SpeciesId": species["SpeciesId"], "Name": pop_name, "InbredSetName": pop_fullname, "FullName": pop_fullname, "Family": request.form.get("inbredset_family") or None, "Description": request.form.get("description") or None }) flash("Grouping/Population created successfully.", "alert-success") return render_template( "samples/upload-samples.html", species=species, population=with_db_connection( lambda conn: population_by_id(conn, pop_id))) @samples.route("/upload/select-population", methods=["POST"]) def select_population(): """Select from existing groupings/populations.""" species_page = redirect(url_for("samples.select_species"), code=307) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, request.form.get("species_id")) pop_id = int(request.form.get("inbredset_id")) population = with_db_connection(lambda conn: population_by_id(conn, pop_id)) if not bool(species): flash("Invalid species!", "alert-error error-select-population") return species_page if not bool(population): flash("Invalid grouping/population!", "alert-error error-select-population") return species_page return render_template("samples/upload-samples.html", species=species, population=population) def read_samples_file(filepath, separator: str, firstlineheading: bool, **kwargs) -> Iterator[dict]: """Read the samples file.""" with open(filepath, "r", encoding="utf-8") as inputfile: reader = csv.DictReader( inputfile, fieldnames=( None if firstlineheading else ("Name", "Name2", "Symbol", "Alias")), delimiter=separator, quotechar=kwargs.get("quotechar", '"')) for row in reader: yield row def save_samples_data(conn: mdb.Connection, speciesid: int, file_data: Iterator[dict]): """Save the samples to DB.""" data = ({**row, "SpeciesId": speciesid} for row in file_data) with conn.cursor() as cursor: while True: batch = take(data, 5000) if len(batch) == 0: break cursor.executemany( "INSERT INTO Strain(Name, Name2, SpeciesId, Symbol, Alias) " "VALUES(" " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s" ") ON DUPLICATE KEY UPDATE Name=Name", batch) def cross_reference_samples(conn: mdb.Connection, species_id: int, population_id: int, strain_names: Iterator[str]): """Link samples to their population.""" with conn.cursor(cursorclass=DictCursor) as cursor: while True: batch = take(strain_names, 5000) if len(batch) == 0: break params_str = ", ".join(["%s"] * len(batch)) ## This query is slow -- investigate. cursor.execute( "SELECT s.Id FROM Strain AS s LEFT JOIN StrainXRef AS sx " "ON s.Id = sx.StrainId WHERE s.SpeciesId=%s AND s.Name IN " f"({params_str}) AND sx.StrainId IS NULL", (species_id,) + tuple(batch)) strain_ids = (sid["Id"] for sid in cursor.fetchall()) cursor.execute( "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s", (population_id,)) last_order_id = cursor.fetchone()["loid"] cursor.executemany( "INSERT INTO StrainXRef( " " InbredSetId, StrainId, OrderId, Used_for_mapping, PedigreeStatus" ")" "VALUES (" " %(pop_id)s, %(strain_id)s, %(order_id)s, %(mapping)s, " " %(pedigree)s" ")", tuple({ "pop_id": population_id, "strain_id": strain_id, "order_id": order_id, "mapping": "N", "pedigree": None } for order_id, strain_id in enumerate(strain_ids, start=(last_order_id+10)))) @samples.route("/upload/samples", methods=["POST"]) def upload_samples(): """Upload the samples.""" samples_uploads_page = redirect(url_for("samples.select_population"), code=307) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, request.form.get("species_id")) if not bool(species): flash("Invalid species!", "alert-error") return samples_uploads_page population = with_db_connection( lambda conn: population_by_id( conn, int(request.form.get("inbredset_id")))) if not bool(population): flash("Invalid grouping/population!", "alert-error") return samples_uploads_page samples_file = save_file(request.files["samples_file"], Path(app.config["UPLOAD_FOLDER"])) if not bool(samples_file): flash("You need to provide a file with the samples data.") return samples_uploads_page firstlineheading = (request.form.get("first_line_heading") == "on") separator = request.form.get("separator") if separator == "other": separator = request.form.get("other_separator") if not bool(separator): flash("You need to provide a separator character.", "alert-error") return samples_uploads_page save_samples_data( conn, species["SpeciesId"], read_samples_file(samples_file, separator, firstlineheading)) cross_reference_samples( conn, species["SpeciesId"], population["InbredSetId"], (row["Name"] for row in read_samples_file(samples_file, separator, firstlineheading))) return "SUCCESS: Respond with a better UI than this."