diff options
Diffstat (limited to 'uploader/genotypes')
| -rw-r--r-- | uploader/genotypes/__init__.py | 1 | ||||
| -rw-r--r-- | uploader/genotypes/models.py | 114 | ||||
| -rw-r--r-- | uploader/genotypes/views.py | 178 |
3 files changed, 293 insertions, 0 deletions
diff --git a/uploader/genotypes/__init__.py b/uploader/genotypes/__init__.py new file mode 100644 index 0000000..d0025d6 --- /dev/null +++ b/uploader/genotypes/__init__.py @@ -0,0 +1 @@ +"""The Genotypes module.""" diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py new file mode 100644 index 0000000..34d2cfe --- /dev/null +++ b/uploader/genotypes/models.py @@ -0,0 +1,114 @@ +"""Functions for handling genotypes.""" +from typing import Optional +from datetime import datetime + +import MySQLdb as mdb +from MySQLdb.cursors import Cursor, DictCursor +from flask import current_app as app + +from gn_libs.mysqldb import debug_query + +def genocode_by_population( + conn: mdb.Connection, population_id: int) -> tuple[dict, ...]: + """Get the allele/genotype codes.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s", + (population_id,)) + return tuple(dict(item) for item in cursor.fetchall()) + + +def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int: + """Find the total count of the genotype markers for a species.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s", + (species_id,)) + return int(cursor.fetchone()["markers_count"]) + + +def genotype_markers( + conn: mdb.Connection, + species_id: int, + offset: int = 0, + limit: Optional[int] = None +) -> tuple[tuple[dict, ...], int]: + """Retrieve markers from the database.""" + _query_template = ( + "SELECT %%COLS%% FROM Geno AS gno " + "WHERE gno.SpeciesId=%s " + "%%LIMIT%%") + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + _query_template.replace("%%LIMIT%%", "").replace( + "%%COLS%%", "COUNT(gno.Id) AS total_records"), + (species_id,)) + _total_records = cursor.fetchone()["total_records"] + cursor.execute( + _query_template.replace("%%COLS%%", "gno.*").replace( + "%%LIMIT%%", + (f"LIMIT {int(limit)} OFFSET {int(offset)}" + if bool(limit) and limit > 0 + else "")), + (species_id,)) + debug_query(cursor, app.logger) + return tuple(dict(row) for row in cursor.fetchall()), _total_records + + +def genotype_dataset( + conn: mdb.Connection, + species_id: int, + population_id: int, + dataset_id: Optional[int] = None +) -> Optional[dict]: + """Retrieve genotype datasets from the database. + + Apparently, you should only ever have one genotype dataset for a population. + """ + _query = ( + "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset " + "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf " + "ON iset.Id=gf.InbredSetId " + "WHERE s.Id=%s AND iset.Id=%s") + _params = (species_id, population_id) + if bool(dataset_id): + _query = _query + " AND gf.Id=%s" + _params = _params + (dataset_id,)# type: ignore[assignment] + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_query, _params) + debug_query(cursor, app.logger) + result = cursor.fetchone() + if bool(result): + return dict(result) + return None + + +def save_new_dataset( + cursor: Cursor, + population_id: int, + name: str, + fullname: str, + shortname: str +) -> dict: + """Save a new genotype dataset into the database.""" + params = { + "InbredSetId": population_id, + "Name": name, + "FullName": fullname, + "ShortName": shortname, + "CreateTime": datetime.now().date().isoformat(), + "public": 2, + "confidentiality": 0, + "AuthorisedUsers": None + } + cursor.execute( + "INSERT INTO GenoFreeze(" + "Name, FullName, ShortName, CreateTime, public, InbredSetId, " + "confidentiality, AuthorisedUsers" + ") VALUES (" + "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, " + "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s" + ")", + params) + return {**params, "Id": cursor.lastrowid} diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py new file mode 100644 index 0000000..f27671c --- /dev/null +++ b/uploader/genotypes/views.py @@ -0,0 +1,178 @@ +"""Views for the genotypes.""" +import logging + +from MySQLdb.cursors import DictCursor +from pymonad.either import Left, Right, Either +from gn_libs.mysqldb import database_connection +from flask import (flash, + request, + jsonify, + redirect, + Blueprint, + render_template, + current_app as app) + +from uploader.flask_extensions import url_for +from uploader.ui import make_template_renderer +from uploader.oauth2.client import oauth2_post +from uploader.authorisation import require_login +from uploader.route_utils import generic_select_population +from uploader.datautils import safe_int, enumerate_sequence +from uploader.species.models import all_species, species_by_id +from uploader.monadic_requests import make_either_error_handler +from uploader.population.models import population_by_species_and_id +from uploader.request_checks import with_species, with_dataset, with_population + +from .models import (genotype_markers, + genotype_dataset, + save_new_dataset, + genotype_markers_count, + genocode_by_population) + +logger = logging.getLogger(__name__) +genotypesbp = Blueprint("genotypes", __name__) +render_template = make_template_renderer("genotypes") + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes", + methods=["GET"]) +@require_login +@with_population(species_redirect_uri="species.list_species", + redirect_uri="species.populations.list_species_populations") +def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """List genotype details for species and population.""" + with database_connection(app.config["SQL_URI"]) as conn: + return render_template("genotypes/list-genotypes.html", + species=species, + population=population, + genocode=genocode_by_population( + conn, population["Id"]), + total_markers=genotype_markers_count( + conn, species["SpeciesId"]), + dataset=genotype_dataset(conn, + species["SpeciesId"], + population["Id"]), + activelink="list-genotypes") + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/<int:dataset_id>/list-markers", + methods=["GET"]) +@require_login +@with_species(redirect_uri="species.populations.genotypes.list_genotypes") +def list_markers(species: dict, **_kwargs): + """List the markers that exist for this species.""" + args = request.args + offset = int(args.get("start") or 0) + with database_connection(app.config["SQL_URI"]) as conn: + markers, total_records = genotype_markers( + conn, + species["SpeciesId"], + offset=offset, + limit=int(args.get("length") or 0)) + return jsonify({ + **({"draw": int(args.get("draw"))} + if bool(args.get("draw") or False) + else {}), + "recordsTotal": total_records, + "recordsFiltered": len(markers), + "markers": tuple({**marker, "index": idx} + for idx, marker in + enumerate(markers, start=offset+1)) + }) + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "<int:dataset_id>/view", + methods=["GET"]) +@require_login +def view_dataset(species_id: int, population_id: int, dataset_id: int): + """View details regarding a specific dataset.""" + with database_connection(app.config["SQL_URI"]) as conn: + species = species_by_id(conn, species_id) + if not bool(species): + flash("Invalid species provided!", "alert-danger") + return redirect(url_for("species.list_species")) + + population = population_by_species_and_id( + conn, species_id, population_id) + if not bool(population): + flash("Invalid population selected!", "alert-danger") + return redirect(url_for( + "species.populations.list_species_populations", + species_id=species_id)) + + dataset = genotype_dataset(conn, species_id, population_id, dataset_id) + if not bool(dataset): + flash("Could not find such a dataset!", "alert-danger") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species_id, + population_id=population_id)) + + return render_template("genotypes/view-dataset.html", + species=species, + population=population, + dataset=dataset, + activelink="view-dataset") + + +@genotypesbp.route( + "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/" + "create", + methods=["GET", "POST"]) +@require_login +@with_population(species_redirect_uri="species.list_species", + redirect_uri="species.populations.list_species_populations") +def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument] + """Create a genotype dataset.""" + if request.method == "GET": + return render_template("genotypes/create-dataset.html", + species=species, + population=population, + activelink="create-dataset") + + with (database_connection(app.config["SQL_URI"]) as conn, + conn.cursor(cursorclass=DictCursor) as cursor): + + def __save_dataset__() -> Either: + form = request.form + try: + return Right(save_new_dataset( + cursor, + population["Id"], + form["geno-dataset-name"], + form["geno-dataset-fullname"], + form["geno-dataset-shortname"])) + except Exception: + msg = "Error adding new Genotype dataset to database." + logger.error(msg, exc_info=True) + return Left(Exception(msg)) + + def __success__(_success): + flash("Successfully created genotype dataset.", "alert-success") + return redirect(url_for( + "species.populations.genotypes.list_genotypes", + species_id=species["SpeciesId"], + population_id=population["Id"])) + + return __save_dataset__().then( + lambda new_dataset: oauth2_post( + "auth/resource/genotypes/create", + json={ + **dict(request.form), + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": new_dataset["Id"], + "dataset_name": new_dataset["Name"], + "dataset_fullname": new_dataset["FullName"], + "dataset_shortname": new_dataset["ShortName"], + "public": "on" + } + ) + ).either( + make_either_error_handler( + "There was an error creating the genotype dataset."), + __success__) |
