diff options
Diffstat (limited to 'uploader/genotypes')
| -rw-r--r-- | uploader/genotypes/models.py | 114 | ||||
| -rw-r--r-- | uploader/genotypes/views.py | 187 |
2 files changed, 259 insertions, 42 deletions
diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py new file mode 100644 index 0000000..34d2cfe --- /dev/null +++ b/uploader/genotypes/models.py @@ -0,0 +1,114 @@ +"""Functions for handling genotypes.""" +from typing import Optional +from datetime import datetime + +import MySQLdb as mdb +from MySQLdb.cursors import Cursor, DictCursor +from flask import current_app as app + +from gn_libs.mysqldb import debug_query + +def genocode_by_population( + conn: mdb.Connection, population_id: int) -> tuple[dict, ...]: + """Get the allele/genotype codes.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s", + (population_id,)) + return tuple(dict(item) for item in cursor.fetchall()) + + +def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int: + """Find the total count of the genotype markers for a species.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s", + (species_id,)) + return int(cursor.fetchone()["markers_count"]) + + +def genotype_markers( + conn: mdb.Connection, + species_id: int, + offset: int = 0, + limit: Optional[int] = None +) -> tuple[tuple[dict, ...], int]: + """Retrieve markers from the database.""" + _query_template = ( + "SELECT %%COLS%% FROM Geno AS gno " + "WHERE gno.SpeciesId=%s " + "%%LIMIT%%") + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + _query_template.replace("%%LIMIT%%", "").replace( + "%%COLS%%", "COUNT(gno.Id) AS total_records"), + (species_id,)) + _total_records = cursor.fetchone()["total_records"] + cursor.execute( + _query_template.replace("%%COLS%%", "gno.*").replace( + "%%LIMIT%%", + (f"LIMIT {int(limit)} OFFSET {int(offset)}" + if bool(limit) and limit > 0 + else "")), + (species_id,)) + debug_query(cursor, app.logger) + return tuple(dict(row) for row in cursor.fetchall()), _total_records + + +def genotype_dataset( + conn: mdb.Connection, + species_id: int, + population_id: int, + dataset_id: Optional[int] = None +) -> Optional[dict]: + """Retrieve genotype datasets from the database. + + Apparently, you should only ever have one genotype dataset for a population. + """ + _query = ( + "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset " + "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf " + "ON iset.Id=gf.InbredSetId " + "WHERE s.Id=%s AND iset.Id=%s") + _params = (species_id, population_id) + if bool(dataset_id): + _query = _query + " AND gf.Id=%s" + _params = _params + (dataset_id,)# type: ignore[assignment] + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_query, _params) + debug_query(cursor, app.logger) + result = cursor.fetchone() + if bool(result): + return dict(result) + return None + + +def save_new_dataset( + cursor: Cursor, + population_id: int, + name: str, + fullname: str, + shortname: str +) -> dict: + """Save a new genotype dataset into the database.""" + params = { + "InbredSetId": population_id, + "Name": name, + "FullName": fullname, + "ShortName": shortname, + "CreateTime": datetime.now().date().isoformat(), + "public": 2, + "confidentiality": 0, + "AuthorisedUsers": None + } + cursor.execute( + "INSERT INTO GenoFreeze(" + "Name, FullName, ShortName, CreateTime, public, InbredSetId, " + "confidentiality, AuthorisedUsers" + ") VALUES (" + "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, " + "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s" + ")", + params) + return {**params, "Id": cursor.lastrowid} diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py index 885e008..f27671c 100644 --- a/uploader/genotypes/views.py +++ b/uploader/genotypes/views.py @@ -1,75 +1,178 @@ """Views for the genotypes.""" +import logging + +from MySQLdb.cursors import DictCursor +from pymonad.either import Left, Right, Either +from gn_libs.mysqldb import database_connection from flask import (flash, request, - url_for, + jsonify, redirect, Blueprint, render_template, current_app as app) -from uploader.datautils import order_by_family +from uploader.flask_extensions import url_for +from uploader.ui import make_template_renderer +from uploader.oauth2.client import oauth2_post from uploader.authorisation import require_login -from uploader.db_utils import database_connection +from uploader.route_utils import generic_select_population +from uploader.datautils import safe_int, enumerate_sequence from uploader.species.models import all_species, species_by_id -from uploader.population.models import (populations_by_species, - population_by_species_and_id) +from uploader.monadic_requests import make_either_error_handler +from uploader.population.models import population_by_species_and_id +from uploader.request_checks import with_species, with_dataset, with_population + +from .models import (genotype_markers, + genotype_dataset, + save_new_dataset, + genotype_markers_count, + genocode_by_population) +logger = logging.getLogger(__name__) genotypesbp = Blueprint("genotypes", __name__) +render_template = make_template_renderer("genotypes") -@genotypesbp.route("populations/genotypes", methods=["GET"]) + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes", + methods=["GET"]) @require_login -def index(): - """Direct entry-point for genotypes.""" +@with_population(species_redirect_uri="species.list_species", + redirect_uri="species.populations.list_species_populations") +def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """List genotype details for species and population.""" with database_connection(app.config["SQL_URI"]) as conn: - if not bool(request.args.get("species_id")): - return render_template("genotypes/index.html", - species=order_by_family(all_species(conn)), - activelink="genotypes") - species = species_by_id(conn, request.args.get("species_id")) - if not bool(species): - flash(f"Could not find species with ID '{request.args.get('species_id')}'!", - "alert-danger") - return redirect(url_for("species.populations.genotypes.index")) - return redirect(url_for("species.populations.genotypes.select_population", - species_id=species["SpeciesId"])) + return render_template("genotypes/list-genotypes.html", + species=species, + population=population, + genocode=genocode_by_population( + conn, population["Id"]), + total_markers=genotype_markers_count( + conn, species["SpeciesId"]), + dataset=genotype_dataset(conn, + species["SpeciesId"], + population["Id"]), + activelink="list-genotypes") -@genotypesbp.route("/<int:species_id>/populations/genotypes/select-population", - methods=["GET"]) +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/<int:dataset_id>/list-markers", + methods=["GET"]) @require_login -def select_population(species_id: int): - """Select the population under which the genotypes go.""" +@with_species(redirect_uri="species.populations.genotypes.list_genotypes") +def list_markers(species: dict, **_kwargs): + """List the markers that exist for this species.""" + args = request.args + offset = int(args.get("start") or 0) + with database_connection(app.config["SQL_URI"]) as conn: + markers, total_records = genotype_markers( + conn, + species["SpeciesId"], + offset=offset, + limit=int(args.get("length") or 0)) + return jsonify({ + **({"draw": int(args.get("draw"))} + if bool(args.get("draw") or False) + else {}), + "recordsTotal": total_records, + "recordsFiltered": len(markers), + "markers": tuple({**marker, "index": idx} + for idx, marker in + enumerate(markers, start=offset+1)) + }) + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "<int:dataset_id>/view", + methods=["GET"]) +@require_login +def view_dataset(species_id: int, population_id: int, dataset_id: int): + """View details regarding a specific dataset.""" with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) if not bool(species): flash("Invalid species provided!", "alert-danger") - return redirect(url_for("species.populations.genotypes.index")) - - if not bool(request.args.get("population_id")): - return render_template("genotypes/select-population.html", - species=species, - populations=order_by_family( - populations_by_species(conn, species_id), - order_key="FamilyOrder"), - activelink="genotypes") + return redirect(url_for("species.list_species")) population = population_by_species_and_id( - conn, species_id, request.args.get("population_id")) + conn, species_id, population_id) if not bool(population): flash("Invalid population selected!", "alert-danger") return redirect(url_for( - "species.populations.genotypes.select_population", + "species.populations.list_species_populations", species_id=species_id)) - return redirect(url_for("species.populations.genotypes.list_genotypes", - species_id=species_id, - population_id=population["Id"])) + dataset = genotype_dataset(conn, species_id, population_id, dataset_id) + if not bool(dataset): + flash("Could not find such a dataset!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species_id, + population_id=population_id)) + + return render_template("genotypes/view-dataset.html", + species=species, + population=population, + dataset=dataset, + activelink="view-dataset") @genotypesbp.route( - "/<int:species_id>/populations/<int:population_id>/genotypes", - methods=["GET"]) + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "create", + methods=["GET", "POST"]) @require_login -def list_genotypes(species_id: int, population_id: int): - """List genotype details for species and population.""" - return f"Would list geno info for population {population_id} from species {species_id}" +@with_population(species_redirect_uri="species.list_species", + redirect_uri="species.populations.list_species_populations") +def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """Create a genotype dataset.""" + if request.method == "GET": + return render_template("genotypes/create-dataset.html", + species=species, + population=population, + activelink="create-dataset") + + with (database_connection(app.config["SQL_URI"]) as conn, + conn.cursor(cursorclass=DictCursor) as cursor): + + def __save_dataset__() -> Either: + form = request.form + try: + return Right(save_new_dataset( + cursor, + population["Id"], + form["geno-dataset-name"], + form["geno-dataset-fullname"], + form["geno-dataset-shortname"])) + except Exception: + msg = "Error adding new Genotype dataset to database." + logger.error(msg, exc_info=True) + return Left(Exception(msg)) + + def __success__(_success): + flash("Successfully created genotype dataset.", "alert-success") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species["SpeciesId"], + population_id=population["Id"])) + + return __save_dataset__().then( + lambda new_dataset: oauth2_post( + "auth/resource/genotypes/create", + json={ + **dict(request.form), + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": new_dataset["Id"], + "dataset_name": new_dataset["Name"], + "dataset_fullname": new_dataset["FullName"], + "dataset_shortname": new_dataset["ShortName"], + "public": "on" + } + ) + ).either( + make_either_error_handler( + "There was an error creating the genotype dataset."), + __success__) |
