aboutsummaryrefslogtreecommitdiff
path: root/uploader/genotypes
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/genotypes')
-rw-r--r--uploader/genotypes/__init__.py1
-rw-r--r--uploader/genotypes/models.py101
-rw-r--r--uploader/genotypes/views.py204
3 files changed, 306 insertions, 0 deletions
diff --git a/uploader/genotypes/__init__.py b/uploader/genotypes/__init__.py
new file mode 100644
index 0000000..d0025d6
--- /dev/null
+++ b/uploader/genotypes/__init__.py
@@ -0,0 +1 @@
+"""The Genotypes module."""
diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py
new file mode 100644
index 0000000..44c98b1
--- /dev/null
+++ b/uploader/genotypes/models.py
@@ -0,0 +1,101 @@
+"""Functions for handling genotypes."""
+from typing import Optional
+from datetime import datetime
+
+import MySQLdb as mdb
+from MySQLdb.cursors import Cursor, DictCursor
+
+from uploader.db_utils import debug_query
+
+def genocode_by_population(
+ conn: mdb.Connection, population_id: int) -> tuple[dict, ...]:
+ """Get the allele/genotype codes."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s",
+ (population_id,))
+ return tuple(dict(item) for item in cursor.fetchall())
+
+
+def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int:
+ """Find the total count of the genotype markers for a species."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s",
+ (species_id,))
+ return int(cursor.fetchone()["markers_count"])
+
+
+def genotype_markers(
+ conn: mdb.Connection,
+ species_id: int,
+ offset: int = 0,
+ limit: Optional[int] = None
+) -> tuple[dict, ...]:
+ """Retrieve markers from the database."""
+ _query = "SELECT * FROM Geno WHERE SpeciesId=%s"
+ if bool(limit) and limit > 0:# type: ignore[operator]
+ _query = _query + f" LIMIT {limit} OFFSET {offset}"
+
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_query, (species_id,))
+ debug_query(cursor)
+ return tuple(dict(row) for row in cursor.fetchall())
+
+
+def genotype_dataset(
+ conn: mdb.Connection,
+ species_id: int,
+ population_id: int,
+ dataset_id: Optional[int] = None
+) -> Optional[dict]:
+ """Retrieve genotype datasets from the database.
+
+ Apparently, you should only ever have one genotype dataset for a population.
+ """
+ _query = (
+ "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
+ "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
+ "ON iset.Id=gf.InbredSetId "
+ "WHERE s.Id=%s AND iset.Id=%s")
+ _params = (species_id, population_id)
+ if bool(dataset_id):
+ _query = _query + " AND gf.Id=%s"
+ _params = _params + (dataset_id,)# type: ignore[assignment]
+
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_query, _params)
+ debug_query(cursor)
+ result = cursor.fetchone()
+ if bool(result):
+ return dict(result)
+ return None
+
+
+def save_new_dataset(
+ cursor: Cursor,
+ population_id: int,
+ name: str,
+ fullname: str,
+ shortname: str
+) -> dict:
+ """Save a new genotype dataset into the database."""
+ params = {
+ "InbredSetId": population_id,
+ "Name": name,
+ "FullName": fullname,
+ "ShortName": shortname,
+ "CreateTime": datetime.now().date().isoformat(),
+ "public": 2,
+ "confidentiality": 0,
+ "AuthorisedUsers": None
+ }
+ cursor.execute(
+ "INSERT INTO GenoFreeze("
+ "Name, FullName, ShortName, CreateTime, public, InbredSetId, "
+ "confidentiality, AuthorisedUsers"
+ ") VALUES ("
+ "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
+ "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
+ ")",
+ params)
+ return {**params, "Id": cursor.lastrowid}
diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py
new file mode 100644
index 0000000..0821eca
--- /dev/null
+++ b/uploader/genotypes/views.py
@@ -0,0 +1,204 @@
+"""Views for the genotypes."""
+from MySQLdb.cursors import DictCursor
+from flask import (flash,
+ request,
+ url_for,
+ redirect,
+ Blueprint,
+ render_template,
+ current_app as app)
+
+from uploader.ui import make_template_renderer
+from uploader.oauth2.client import oauth2_post
+from uploader.authorisation import require_login
+from uploader.db_utils import database_connection
+from uploader.species.models import all_species, species_by_id
+from uploader.monadic_requests import make_either_error_handler
+from uploader.request_checks import with_species, with_population
+from uploader.datautils import safe_int, order_by_family, enumerate_sequence
+from uploader.population.models import (populations_by_species,
+ population_by_species_and_id)
+
+from .models import (genotype_markers,
+ genotype_dataset,
+ save_new_dataset,
+ genotype_markers_count,
+ genocode_by_population)
+
+genotypesbp = Blueprint("genotypes", __name__)
+render_template = make_template_renderer("genotypes")
+
+@genotypesbp.route("populations/genotypes", methods=["GET"])
+@require_login
+def index():
+ """Direct entry-point for genotypes."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ if not bool(request.args.get("species_id")):
+ return render_template("genotypes/index.html",
+ species=order_by_family(all_species(conn)),
+ activelink="genotypes")
+ species = species_by_id(conn, request.args.get("species_id"))
+ if not bool(species):
+ flash(f"Could not find species with ID '{request.args.get('species_id')}'!",
+ "alert-danger")
+ return redirect(url_for("species.populations.genotypes.index"))
+ return redirect(url_for("species.populations.genotypes.select_population",
+ species_id=species["SpeciesId"]))
+
+
+@genotypesbp.route("/<int:species_id>/populations/genotypes/select-population",
+ methods=["GET"])
+@require_login
+@with_species(redirect_uri="species.populations.genotypes.index")
+def select_population(species: dict, species_id: int):
+ """Select the population under which the genotypes go."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ if not bool(request.args.get("population_id")):
+ return render_template("genotypes/select-population.html",
+ species=species,
+ populations=order_by_family(
+ populations_by_species(conn, species_id),
+ order_key="FamilyOrder"),
+ activelink="genotypes")
+
+ population = population_by_species_and_id(
+ conn, species_id, request.args.get("population_id"))
+ if not bool(population):
+ flash("Invalid population selected!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.genotypes.select_population",
+ species_id=species_id))
+
+ return redirect(url_for("species.populations.genotypes.list_genotypes",
+ species_id=species_id,
+ population_id=population["Id"]))
+
+
+@genotypesbp.route(
+ "/<int:species_id>/populations/<int:population_id>/genotypes",
+ methods=["GET"])
+@require_login
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+ redirect_uri="species.populations.genotypes.select_population")
+def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+ """List genotype details for species and population."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ return render_template("genotypes/list-genotypes.html",
+ species=species,
+ population=population,
+ genocode=genocode_by_population(
+ conn, population["Id"]),
+ total_markers=genotype_markers_count(
+ conn, species["SpeciesId"]),
+ dataset=genotype_dataset(conn,
+ species["SpeciesId"],
+ population["Id"]),
+ activelink="list-genotypes")
+
+
+@genotypesbp.route("/<int:species_id>/genotypes/list-markers", methods=["GET"])
+@require_login
+@with_species(redirect_uri="species.populations.genotypes.index")
+def list_markers(species: dict, **kwargs):# pylint: disable=[unused-argument]
+ """List a species' genetic markers."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ start_from = max(safe_int(request.args.get("start_from") or 0), 0)
+ count = safe_int(request.args.get("count") or 20)
+ return render_template("genotypes/list-markers.html",
+ species=species,
+ total_markers=genotype_markers_count(
+ conn, species["SpeciesId"]),
+ start_from=start_from,
+ count=count,
+ markers=enumerate_sequence(
+ genotype_markers(conn,
+ species["SpeciesId"],
+ offset=start_from,
+ limit=count),
+ start=start_from+1),
+ activelink="list-markers")
+
+@genotypesbp.route(
+ "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+ "<int:dataset_id>/view",
+ methods=["GET"])
+@require_login
+def view_dataset(species_id: int, population_id: int, dataset_id: int):
+ """View details regarding a specific dataset."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ species = species_by_id(conn, species_id)
+ if not bool(species):
+ flash("Invalid species provided!", "alert-danger")
+ return redirect(url_for("species.populations.genotypes.index"))
+
+ population = population_by_species_and_id(
+ conn, species_id, population_id)
+ if not bool(population):
+ flash("Invalid population selected!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.genotypes.select_population",
+ species_id=species_id))
+
+ dataset = genotype_dataset(conn, species_id, population_id, dataset_id)
+ if not bool(dataset):
+ flash("Could not find such a dataset!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.genotypes.list_genotypes",
+ species_id=species_id,
+ population_id=population_id))
+
+ return render_template("genotypes/view-dataset.html",
+ species=species,
+ population=population,
+ dataset=dataset,
+ activelink="view-dataset")
+
+
+@genotypesbp.route(
+ "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+ "create",
+ methods=["GET", "POST"])
+@require_login
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+ redirect_uri="species.populations.genotypes.select_population")
+def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+ """Create a genotype dataset."""
+ with (database_connection(app.config["SQL_URI"]) as conn,
+ conn.cursor(cursorclass=DictCursor) as cursor):
+ if request.method == "GET":
+ return render_template("genotypes/create-dataset.html",
+ species=species,
+ population=population,
+ activelink="create-dataset")
+
+ form = request.form
+ new_dataset = save_new_dataset(
+ cursor,
+ population["Id"],
+ form["geno-dataset-name"],
+ form["geno-dataset-fullname"],
+ form["geno-dataset-shortname"])
+
+ def __success__(_success):
+ flash("Successfully created genotype dataset.", "alert-success")
+ return redirect(url_for(
+ "species.populations.genotypes.list_genotypes",
+ species_id=species["SpeciesId"],
+ population_id=population["Id"]))
+
+ return oauth2_post(
+ "auth/resource/genotypes/create",
+ json={
+ **dict(request.form),
+ "species_id": species["SpeciesId"],
+ "population_id": population["Id"],
+ "dataset_id": new_dataset["Id"],
+ "dataset_name": form["geno-dataset-name"],
+ "dataset_fullname": form["geno-dataset-fullname"],
+ "dataset_shortname": form["geno-dataset-shortname"],
+ "public": "on"
+ }
+ ).either(
+ make_either_error_handler(
+ "There was an error creating the genotype dataset."),
+ __success__)