"""Code regarding samples""" import csv from pathlib import Path from typing import Iterator import MySQLdb as mdb from MySQLdb.cursors import DictCursor from flask import ( flash, request, url_for, redirect, Blueprint, render_template, current_app as app) from quality_control.parsing import take from .files import save_file from .db_utils import with_db_connection from .dbinsert import species_by_id, groups_by_species samples = Blueprint("samples", __name__) @samples.route("/upload/species", methods=["POST"]) def select_species(): """Select the species.""" index_page = redirect(url_for("entry.upload_file")) species_id = request.form.get("species_id") if bool(species_id): species_id = int(species_id) species = species_by_id(species_id) if bool(species): return render_template( "samples/select-population.html", species=species, populations=groups_by_species(species_id)) flash("Invalid species selected!", "alert-error") flash("You need to select a species", "alert-error") return index_page def save_population(conn: mdb.Connection, population_details: dict) -> int: """Save the population details to the db.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT MAX(Id) AS last_id FROM InbredSet") new_id = cursor.fetchone()["last_id"] + 1 cursor.execute( "INSERT INTO InbredSet(" "Id, InbredSetId, InbredSetName, Name, SpeciesId, FullName, " "MenuOrderId, Description" ") " "VALUES (" "%(Id)s, %(InbredSetId)s, %(InbredSetName)s, %(Name)s, " "%(SpeciesId)s, %(FullName)s, %(MenuOrderId)s, %(Description)s" ")", { "Id": new_id, "InbredSetId": new_id, "MenuOrderId": 0, **population_details }) return new_id def population_by_id(conn: mdb.Connection, population_id: int) -> dict: """Get the grouping/population by id.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM InbredSet WHERE InbredSetId=%s", (population_id,)) return cursor.fetchone() @samples.route("/upload/create-population", methods=["POST"]) def create_population(): """Create new grouping/population.""" species_page = redirect(url_for("samples.select_species"), code=307) species = species_by_id(request.form.get("species_id")) pop_name = request.form.get("inbredset_name").strip() pop_fullname = request.form.get("inbredset_fullname").strip() if not bool(species): flash("Invalid species!", "alert-error error-create-population") return species_page if (not bool(pop_name)) or (not bool(pop_fullname)): flash("You *MUST* provide a grouping/population name", "alert-error error-create-population") return species_page pop_id = with_db_connection(lambda conn: save_population(conn, { "SpeciesId": species["SpeciesId"], "Name": pop_name, "InbredSetName": pop_fullname, "FullName": pop_fullname, "Family": request.form.get("inbredset_family") or None, "Description": request.form.get("description") or None })) flash("Grouping/Population created successfully.", "alert-success") return render_template( "samples/upload-samples.html", species=species, population=with_db_connection( lambda conn: population_by_id(conn, pop_id))) @samples.route("/upload/select-population", methods=["POST"]) def select_population(): """Select from existing groupings/populations.""" species_page = redirect(url_for("samples.select_species"), code=307) species = species_by_id(request.form.get("species_id")) pop_id = int(request.form.get("inbredset_id")) population = with_db_connection(lambda conn: population_by_id(conn, pop_id)) if not bool(species): flash("Invalid species!", "alert-error error-select-population") return species_page if not bool(population): flash("Invalid grouping/population!", "alert-error error-select-population") return species_page return render_template("samples/upload-samples.html", species=species, population=population) def read_samples_file(filepath, separator: str, **kwargs) -> Iterator[dict]: """Read the samples file.""" with open(filepath, "r", encoding="utf-8") as inputfile: reader = csv.DictReader( inputfile, fieldnames=("Name", "Name2", "Symbol", "Alias"), delimiter=separator, quotechar=kwargs.get("quotechar")) for row in reader: yield row def save_samples_data(conn: mdb.Connection, file_data: Iterator[dict]): """Save the samples to DB.""" with conn.cursor() as cursor: while True: cursor.executemany( "INSERT INTO Strain(Name, Name2, SpeciesId, Symbol, Alias) " "VALUES(" " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s" ")", tuple(take(file_data, 10000))) def cross_reference_samples(conn: mdb.Connection, population_id: int, strain_names: tuple[str, ...]): """Link samples to their population.""" with conn.cursor(cursorclass=DictCursor) as cursor: params_str = ", ".join(["%s"] * len(strain_names)) cursor.execute( "SELECT Id FROM Strain WHERE (Name, SpeciesId) IN " f"{params_str}", tuple((name, species["SpeciesId"]) for name in strain_names)) strain_ids = (sid for sid in cursor.fetchall()) cursor.execute( "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s", (population_id,)) last_order_id = cursor.fetchone()["loid"] cursor.executemany( "INSERT INTO StrainXRef(InbredSetId, StrainId, OrderId) " "VALUES (%(pop_id)s, %(strain_id)s, %(order_id)s)", tuple({ "pop_id": population_id, "strain_id": strain_id, "order_id": order_id } for order_id, strain_id in enumerate(strain_ids, start=(last_order_id+10)))) @samples.route("/upload/samples", methods=["POST"]) def upload_samples(): """Upload the samples.""" samples_uploads_page = redirect(url_for("samples.select_population"), code=307) species = species_by_id(request.form.get("species_id")) if not bool(species): flash("Invalid species!", "alert-error") return samples_uploads_page population = with_db_connection( lambda conn: population_by_id( conn, int(request.form.get("inbredset_id")))) if not bool(population): flash("Invalid grouping/population!", "alert-error") return samples_uploads_page samples_file = save_file("samples_file", Path(app.config["UPLOAD_FOLDER"])) if not bool(samples_file): flash("You need to provide a file with the samples data.") return samples_uploads_page def __insert_samples__(conn: mdb.Connection): save_samples_data(conn, read_samples_file(samples_file)) cross_reference_samples( conn, population["InbredSetId"], tuple(row["Name"] for row in read_samples_file(samples_file))) with_db_connection(__insert_samples__) return "SUCCESS: Respond with a better UI than this."