aboutsummaryrefslogtreecommitdiff
path: root/qc_app/samples.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/samples.py')
-rw-r--r--qc_app/samples.py95
1 files changed, 94 insertions, 1 deletions
diff --git a/qc_app/samples.py b/qc_app/samples.py
index cc745ca..27fdad3 100644
--- a/qc_app/samples.py
+++ b/qc_app/samples.py
@@ -1,9 +1,22 @@
"""Code regarding samples"""
+import csv
+from pathlib import Path
+from typing import Iterator
+
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
from flask import (
- flash, request, url_for, redirect, Blueprint, render_template, current_app as app)
+ flash,
+ request,
+ url_for,
+ redirect,
+ Blueprint,
+ render_template,
+ current_app as app)
+
+from quality_control.parsing import take
+from .files import save_file
from .db_utils import with_db_connection
from .dbinsert import species_by_id, groups_by_species
@@ -106,3 +119,83 @@ def select_population():
return render_template("samples/upload-samples.html",
species=species,
population=population)
+
+def read_samples_file(filepath, separator: str, **kwargs) -> Iterator[dict]:
+ """Read the samples file."""
+ with open(filepath, "r", encoding="utf-8") as inputfile:
+ reader = csv.DictReader(
+ inputfile,
+ fieldnames=("Name", "Name2", "Symbol", "Alias"),
+ delimiter=separator,
+ quotechar=kwargs.get("quotechar"))
+ for row in reader:
+ yield row
+
+def save_samples_data(conn: mdb.Connection, file_data: Iterator[dict]):
+ """Save the samples to DB."""
+ with conn.cursor() as cursor:
+ while True:
+ cursor.executemany(
+ "INSERT INTO Strain(Name, Name2, SpeciesId, Symbol, Alias) "
+ "VALUES("
+ " %(Name)s, %(Name2)s, %(SpeciesId)s, %(Symbol)s, %(Alias)s"
+ ")",
+ tuple(take(file_data, 10000)))
+
+def cross_reference_samples(conn: mdb.Connection,
+ population_id: int,
+ strain_names: tuple[str, ...]):
+ """Link samples to their population."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ params_str = ", ".join(["%s"] * len(strain_names))
+ cursor.execute(
+ "SELECT Id FROM Strain WHERE (Name, SpeciesId) IN "
+ f"{params_str}",
+ tuple((name, species["SpeciesId"]) for name in strain_names))
+ strain_ids = (sid for sid in cursor.fetchall())
+ cursor.execute(
+ "SELECT MAX(OrderId) AS loid FROM StrainXRef WHERE InbredSetId=%s",
+ (population_id,))
+ last_order_id = cursor.fetchone()["loid"]
+ cursor.executemany(
+ "INSERT INTO StrainXRef(InbredSetId, StrainId, OrderId) "
+ "VALUES (%(pop_id)s, %(strain_id)s, %(order_id)s)",
+ tuple({
+ "pop_id": population_id,
+ "strain_id": strain_id,
+ "order_id": order_id
+ } for order_id, strain_id in
+ enumerate(strain_ids, start=(last_order_id+10))))
+
+@samples.route("/upload/samples", methods=["POST"])
+def upload_samples():
+ """Upload the samples."""
+ samples_uploads_page = redirect(url_for("samples.select_population"),
+ code=307)
+
+ species = species_by_id(request.form.get("species_id"))
+ if not bool(species):
+ flash("Invalid species!", "alert-error")
+ return samples_uploads_page
+
+ population = with_db_connection(
+ lambda conn: population_by_id(
+ conn, int(request.form.get("inbredset_id"))))
+ if not bool(population):
+ flash("Invalid grouping/population!", "alert-error")
+ return samples_uploads_page
+
+ samples_file = save_file("samples_file", Path(app.config["UPLOAD_FOLDER"]))
+ if not bool(samples_file):
+ flash("You need to provide a file with the samples data.")
+ return samples_uploads_page
+
+ def __insert_samples__(conn: mdb.Connection):
+ save_samples_data(conn, read_samples_file(samples_file))
+ cross_reference_samples(
+ conn,
+ population["InbredSetId"],
+ tuple(row["Name"] for row in read_samples_file(samples_file)))
+
+ with_db_connection(__insert_samples__)
+ return "SUCCESS: Respond with a better UI than this."