diff options
Diffstat (limited to 'uploader/genotypes')
-rw-r--r-- | uploader/genotypes/models.py | 70 | ||||
-rw-r--r-- | uploader/genotypes/views.py | 146 |
2 files changed, 162 insertions, 54 deletions
diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py index 642caa6..44c98b1 100644 --- a/uploader/genotypes/models.py +++ b/uploader/genotypes/models.py @@ -1,8 +1,9 @@ """Functions for handling genotypes.""" from typing import Optional +from datetime import datetime import MySQLdb as mdb -from MySQLdb.cursors import DictCursor +from MySQLdb.cursors import Cursor, DictCursor from uploader.db_utils import debug_query @@ -32,7 +33,7 @@ def genotype_markers( ) -> tuple[dict, ...]: """Retrieve markers from the database.""" _query = "SELECT * FROM Geno WHERE SpeciesId=%s" - if bool(limit) and limit > 0: + if bool(limit) and limit > 0:# type: ignore[operator] _query = _query + f" LIMIT {limit} OFFSET {offset}" with conn.cursor(cursorclass=DictCursor) as cursor: @@ -41,17 +42,60 @@ def genotype_markers( return tuple(dict(row) for row in cursor.fetchall()) -def genotype_datasets( +def genotype_dataset( conn: mdb.Connection, species_id: int, - population_id: int -) -> tuple[dict, ...]: - """Retrieve genotype datasets from the database.""" + population_id: int, + dataset_id: Optional[int] = None +) -> Optional[dict]: + """Retrieve genotype datasets from the database. + + Apparently, you should only ever have one genotype dataset for a population. + """ + _query = ( + "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset " + "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf " + "ON iset.Id=gf.InbredSetId " + "WHERE s.Id=%s AND iset.Id=%s") + _params = (species_id, population_id) + if bool(dataset_id): + _query = _query + " AND gf.Id=%s" + _params = _params + (dataset_id,)# type: ignore[assignment] + with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute( - "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset " - "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf " - "ON iset.Id=gf.InbredSetId " - "WHERE s.Id=%s AND iset.Id=%s", - (species_id, population_id)) - return tuple(dict(row) for row in cursor.fetchall()) + cursor.execute(_query, _params) + debug_query(cursor) + result = cursor.fetchone() + if bool(result): + return dict(result) + return None + + +def save_new_dataset( + cursor: Cursor, + population_id: int, + name: str, + fullname: str, + shortname: str +) -> dict: + """Save a new genotype dataset into the database.""" + params = { + "InbredSetId": population_id, + "Name": name, + "FullName": fullname, + "ShortName": shortname, + "CreateTime": datetime.now().date().isoformat(), + "public": 2, + "confidentiality": 0, + "AuthorisedUsers": None + } + cursor.execute( + "INSERT INTO GenoFreeze(" + "Name, FullName, ShortName, CreateTime, public, InbredSetId, " + "confidentiality, AuthorisedUsers" + ") VALUES (" + "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, " + "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s" + ")", + params) + return {**params, "Id": cursor.lastrowid} diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py index 294da0e..0821eca 100644 --- a/uploader/genotypes/views.py +++ b/uploader/genotypes/views.py @@ -1,4 +1,5 @@ """Views for the genotypes.""" +from MySQLdb.cursors import DictCursor from flask import (flash, request, url_for, @@ -7,19 +8,25 @@ from flask import (flash, render_template, current_app as app) +from uploader.ui import make_template_renderer +from uploader.oauth2.client import oauth2_post from uploader.authorisation import require_login from uploader.db_utils import database_connection from uploader.species.models import all_species, species_by_id +from uploader.monadic_requests import make_either_error_handler +from uploader.request_checks import with_species, with_population from uploader.datautils import safe_int, order_by_family, enumerate_sequence from uploader.population.models import (populations_by_species, population_by_species_and_id) from .models import (genotype_markers, - genotype_datasets, + genotype_dataset, + save_new_dataset, genotype_markers_count, genocode_by_population) genotypesbp = Blueprint("genotypes", __name__) +render_template = make_template_renderer("genotypes") @genotypesbp.route("populations/genotypes", methods=["GET"]) @require_login @@ -42,14 +49,10 @@ def index(): @genotypesbp.route("/<int:species_id>/populations/genotypes/select-population", methods=["GET"]) @require_login -def select_population(species_id: int): +@with_species(redirect_uri="species.populations.genotypes.index") +def select_population(species: dict, species_id: int): """Select the population under which the genotypes go.""" with database_connection(app.config["SQL_URI"]) as conn: - species = species_by_id(conn, species_id) - if not bool(species): - flash("Invalid species provided!", "alert-danger") - return redirect(url_for("species.populations.genotypes.index")) - if not bool(request.args.get("population_id")): return render_template("genotypes/select-population.html", species=species, @@ -75,58 +78,44 @@ def select_population(species_id: int): "/<int:species_id>/populations/<int:population_id>/genotypes", methods=["GET"]) @require_login -def list_genotypes(species_id: int, population_id: int): +@with_population(species_redirect_uri="species.populations.genotypes.index", + redirect_uri="species.populations.genotypes.select_population") +def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] """List genotype details for species and population.""" with database_connection(app.config["SQL_URI"]) as conn: - species = species_by_id(conn, species_id) - if not bool(species): - flash("Invalid species provided!", "alert-danger") - return redirect(url_for("species.populations.genotypes.index")) - - population = population_by_species_and_id( - conn, species_id, population_id) - if not bool(population): - flash("Invalid population selected!", "alert-danger") - return redirect(url_for( - "species.populations.genotypes.select_population", - species_id=species_id)) - return render_template("genotypes/list-genotypes.html", species=species, population=population, genocode=genocode_by_population( - conn, population_id), + conn, population["Id"]), total_markers=genotype_markers_count( - conn, species_id), - datasets=genotype_datasets( - conn, species_id, population_id), + conn, species["SpeciesId"]), + dataset=genotype_dataset(conn, + species["SpeciesId"], + population["Id"]), activelink="list-genotypes") @genotypesbp.route("/<int:species_id>/genotypes/list-markers", methods=["GET"]) @require_login -def list_markers(species_id: int): +@with_species(redirect_uri="species.populations.genotypes.index") +def list_markers(species: dict, **kwargs):# pylint: disable=[unused-argument] """List a species' genetic markers.""" with database_connection(app.config["SQL_URI"]) as conn: - species = species_by_id(conn, species_id) - if not bool(species): - flash("Invalid species provided!", "alert-danger") - return redirect(url_for("species.populations.genotypes.index")) - - start_from = safe_int(request.args.get("start_from") or 0) - if start_from < 0: - start_from = 0 + start_from = max(safe_int(request.args.get("start_from") or 0), 0) count = safe_int(request.args.get("count") or 20) - markers = enumerate_sequence( - genotype_markers(conn, species_id, offset=start_from, limit=count), - start=start_from+1) return render_template("genotypes/list-markers.html", species=species, total_markers=genotype_markers_count( - conn, species_id), + conn, species["SpeciesId"]), start_from=start_from, count=count, - markers=markers, + markers=enumerate_sequence( + genotype_markers(conn, + species["SpeciesId"], + offset=start_from, + limit=count), + start=start_from+1), activelink="list-markers") @genotypesbp.route( @@ -136,5 +125,80 @@ def list_markers(species_id: int): @require_login def view_dataset(species_id: int, population_id: int, dataset_id: int): """View details regarding a specific dataset.""" - return (f"Genotype dataset '{dataset_id}, from population '{population_id}' " - f"of species '{species_id}'.") + with database_connection(app.config["SQL_URI"]) as conn: + species = species_by_id(conn, species_id) + if not bool(species): + flash("Invalid species provided!", "alert-danger") + return redirect(url_for("species.populations.genotypes.index")) + + population = population_by_species_and_id( + conn, species_id, population_id) + if not bool(population): + flash("Invalid population selected!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.select_population", + species_id=species_id)) + + dataset = genotype_dataset(conn, species_id, population_id, dataset_id) + if not bool(dataset): + flash("Could not find such a dataset!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species_id, + population_id=population_id)) + + return render_template("genotypes/view-dataset.html", + species=species, + population=population, + dataset=dataset, + activelink="view-dataset") + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "create", + methods=["GET", "POST"]) +@require_login +@with_population(species_redirect_uri="species.populations.genotypes.index", + redirect_uri="species.populations.genotypes.select_population") +def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """Create a genotype dataset.""" + with (database_connection(app.config["SQL_URI"]) as conn, + conn.cursor(cursorclass=DictCursor) as cursor): + if request.method == "GET": + return render_template("genotypes/create-dataset.html", + species=species, + population=population, + activelink="create-dataset") + + form = request.form + new_dataset = save_new_dataset( + cursor, + population["Id"], + form["geno-dataset-name"], + form["geno-dataset-fullname"], + form["geno-dataset-shortname"]) + + def __success__(_success): + flash("Successfully created genotype dataset.", "alert-success") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species["SpeciesId"], + population_id=population["Id"])) + + return oauth2_post( + "auth/resource/genotypes/create", + json={ + **dict(request.form), + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": new_dataset["Id"], + "dataset_name": form["geno-dataset-name"], + "dataset_fullname": form["geno-dataset-fullname"], + "dataset_shortname": form["geno-dataset-shortname"], + "public": "on" + } + ).either( + make_either_error_handler( + "There was an error creating the genotype dataset."), + __success__) |