diff options
Diffstat (limited to 'uploader/genotypes')
-rw-r--r-- | uploader/genotypes/__init__.py | 1 | ||||
-rw-r--r-- | uploader/genotypes/models.py | 101 | ||||
-rw-r--r-- | uploader/genotypes/views.py | 204 |
3 files changed, 306 insertions, 0 deletions
diff --git a/uploader/genotypes/__init__.py b/uploader/genotypes/__init__.py new file mode 100644 index 0000000..d0025d6 --- /dev/null +++ b/uploader/genotypes/__init__.py @@ -0,0 +1 @@ +"""The Genotypes module.""" diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py new file mode 100644 index 0000000..44c98b1 --- /dev/null +++ b/uploader/genotypes/models.py @@ -0,0 +1,101 @@ +"""Functions for handling genotypes.""" +from typing import Optional +from datetime import datetime + +import MySQLdb as mdb +from MySQLdb.cursors import Cursor, DictCursor + +from uploader.db_utils import debug_query + +def genocode_by_population( + conn: mdb.Connection, population_id: int) -> tuple[dict, ...]: + """Get the allele/genotype codes.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s", + (population_id,)) + return tuple(dict(item) for item in cursor.fetchall()) + + +def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int: + """Find the total count of the genotype markers for a species.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s", + (species_id,)) + return int(cursor.fetchone()["markers_count"]) + + +def genotype_markers( + conn: mdb.Connection, + species_id: int, + offset: int = 0, + limit: Optional[int] = None +) -> tuple[dict, ...]: + """Retrieve markers from the database.""" + _query = "SELECT * FROM Geno WHERE SpeciesId=%s" + if bool(limit) and limit > 0:# type: ignore[operator] + _query = _query + f" LIMIT {limit} OFFSET {offset}" + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_query, (species_id,)) + debug_query(cursor) + return tuple(dict(row) for row in cursor.fetchall()) + + +def genotype_dataset( + conn: mdb.Connection, + species_id: int, + population_id: int, + dataset_id: Optional[int] = None +) -> Optional[dict]: + """Retrieve genotype datasets from the database. + + Apparently, you should only ever have one genotype dataset for a population. + """ + _query = ( + "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset " + "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf " + "ON iset.Id=gf.InbredSetId " + "WHERE s.Id=%s AND iset.Id=%s") + _params = (species_id, population_id) + if bool(dataset_id): + _query = _query + " AND gf.Id=%s" + _params = _params + (dataset_id,)# type: ignore[assignment] + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_query, _params) + debug_query(cursor) + result = cursor.fetchone() + if bool(result): + return dict(result) + return None + + +def save_new_dataset( + cursor: Cursor, + population_id: int, + name: str, + fullname: str, + shortname: str +) -> dict: + """Save a new genotype dataset into the database.""" + params = { + "InbredSetId": population_id, + "Name": name, + "FullName": fullname, + "ShortName": shortname, + "CreateTime": datetime.now().date().isoformat(), + "public": 2, + "confidentiality": 0, + "AuthorisedUsers": None + } + cursor.execute( + "INSERT INTO GenoFreeze(" + "Name, FullName, ShortName, CreateTime, public, InbredSetId, " + "confidentiality, AuthorisedUsers" + ") VALUES (" + "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, " + "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s" + ")", + params) + return {**params, "Id": cursor.lastrowid} diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py new file mode 100644 index 0000000..0821eca --- /dev/null +++ b/uploader/genotypes/views.py @@ -0,0 +1,204 @@ +"""Views for the genotypes.""" +from MySQLdb.cursors import DictCursor +from flask import (flash, + request, + url_for, + redirect, + Blueprint, + render_template, + current_app as app) + +from uploader.ui import make_template_renderer +from uploader.oauth2.client import oauth2_post +from uploader.authorisation import require_login +from uploader.db_utils import database_connection +from uploader.species.models import all_species, species_by_id +from uploader.monadic_requests import make_either_error_handler +from uploader.request_checks import with_species, with_population +from uploader.datautils import safe_int, order_by_family, enumerate_sequence +from uploader.population.models import (populations_by_species, + population_by_species_and_id) + +from .models import (genotype_markers, + genotype_dataset, + save_new_dataset, + genotype_markers_count, + genocode_by_population) + +genotypesbp = Blueprint("genotypes", __name__) +render_template = make_template_renderer("genotypes") + +@genotypesbp.route("populations/genotypes", methods=["GET"]) +@require_login +def index(): + """Direct entry-point for genotypes.""" + with database_connection(app.config["SQL_URI"]) as conn: + if not bool(request.args.get("species_id")): + return render_template("genotypes/index.html", + species=order_by_family(all_species(conn)), + activelink="genotypes") + species = species_by_id(conn, request.args.get("species_id")) + if not bool(species): + flash(f"Could not find species with ID '{request.args.get('species_id')}'!", + "alert-danger") + return redirect(url_for("species.populations.genotypes.index")) + return redirect(url_for("species.populations.genotypes.select_population", + species_id=species["SpeciesId"])) + + +@genotypesbp.route("/<int:species_id>/populations/genotypes/select-population", + methods=["GET"]) +@require_login +@with_species(redirect_uri="species.populations.genotypes.index") +def select_population(species: dict, species_id: int): + """Select the population under which the genotypes go.""" + with database_connection(app.config["SQL_URI"]) as conn: + if not bool(request.args.get("population_id")): + return render_template("genotypes/select-population.html", + species=species, + populations=order_by_family( + populations_by_species(conn, species_id), + order_key="FamilyOrder"), + activelink="genotypes") + + population = population_by_species_and_id( + conn, species_id, request.args.get("population_id")) + if not bool(population): + flash("Invalid population selected!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.select_population", + species_id=species_id)) + + return redirect(url_for("species.populations.genotypes.list_genotypes", + species_id=species_id, + population_id=population["Id"])) + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes", + methods=["GET"]) +@require_login +@with_population(species_redirect_uri="species.populations.genotypes.index", + redirect_uri="species.populations.genotypes.select_population") +def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """List genotype details for species and population.""" + with database_connection(app.config["SQL_URI"]) as conn: + return render_template("genotypes/list-genotypes.html", + species=species, + population=population, + genocode=genocode_by_population( + conn, population["Id"]), + total_markers=genotype_markers_count( + conn, species["SpeciesId"]), + dataset=genotype_dataset(conn, + species["SpeciesId"], + population["Id"]), + activelink="list-genotypes") + + +@genotypesbp.route("/<int:species_id>/genotypes/list-markers", methods=["GET"]) +@require_login +@with_species(redirect_uri="species.populations.genotypes.index") +def list_markers(species: dict, **kwargs):# pylint: disable=[unused-argument] + """List a species' genetic markers.""" + with database_connection(app.config["SQL_URI"]) as conn: + start_from = max(safe_int(request.args.get("start_from") or 0), 0) + count = safe_int(request.args.get("count") or 20) + return render_template("genotypes/list-markers.html", + species=species, + total_markers=genotype_markers_count( + conn, species["SpeciesId"]), + start_from=start_from, + count=count, + markers=enumerate_sequence( + genotype_markers(conn, + species["SpeciesId"], + offset=start_from, + limit=count), + start=start_from+1), + activelink="list-markers") + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "<int:dataset_id>/view", + methods=["GET"]) +@require_login +def view_dataset(species_id: int, population_id: int, dataset_id: int): + """View details regarding a specific dataset.""" + with database_connection(app.config["SQL_URI"]) as conn: + species = species_by_id(conn, species_id) + if not bool(species): + flash("Invalid species provided!", "alert-danger") + return redirect(url_for("species.populations.genotypes.index")) + + population = population_by_species_and_id( + conn, species_id, population_id) + if not bool(population): + flash("Invalid population selected!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.select_population", + species_id=species_id)) + + dataset = genotype_dataset(conn, species_id, population_id, dataset_id) + if not bool(dataset): + flash("Could not find such a dataset!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species_id, + population_id=population_id)) + + return render_template("genotypes/view-dataset.html", + species=species, + population=population, + dataset=dataset, + activelink="view-dataset") + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "create", + methods=["GET", "POST"]) +@require_login +@with_population(species_redirect_uri="species.populations.genotypes.index", + redirect_uri="species.populations.genotypes.select_population") +def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """Create a genotype dataset.""" + with (database_connection(app.config["SQL_URI"]) as conn, + conn.cursor(cursorclass=DictCursor) as cursor): + if request.method == "GET": + return render_template("genotypes/create-dataset.html", + species=species, + population=population, + activelink="create-dataset") + + form = request.form + new_dataset = save_new_dataset( + cursor, + population["Id"], + form["geno-dataset-name"], + form["geno-dataset-fullname"], + form["geno-dataset-shortname"]) + + def __success__(_success): + flash("Successfully created genotype dataset.", "alert-success") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species["SpeciesId"], + population_id=population["Id"])) + + return oauth2_post( + "auth/resource/genotypes/create", + json={ + **dict(request.form), + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": new_dataset["Id"], + "dataset_name": form["geno-dataset-name"], + "dataset_fullname": form["geno-dataset-fullname"], + "dataset_shortname": form["geno-dataset-shortname"], + "public": "on" + } + ).either( + make_either_error_handler( + "There was an error creating the genotype dataset."), + __success__) |