about summary refs log tree commit diff
path: root/uploader/genotypes
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/genotypes')
-rw-r--r--uploader/genotypes/__init__.py1
-rw-r--r--uploader/genotypes/models.py102
-rw-r--r--uploader/genotypes/views.py207
3 files changed, 310 insertions, 0 deletions
diff --git a/uploader/genotypes/__init__.py b/uploader/genotypes/__init__.py
new file mode 100644
index 0000000..d0025d6
--- /dev/null
+++ b/uploader/genotypes/__init__.py
@@ -0,0 +1 @@
+"""The Genotypes module."""
diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py
new file mode 100644
index 0000000..4c3e634
--- /dev/null
+++ b/uploader/genotypes/models.py
@@ -0,0 +1,102 @@
+"""Functions for handling genotypes."""
+from typing import Optional
+from datetime import datetime
+
+import MySQLdb as mdb
+from MySQLdb.cursors import Cursor, DictCursor
+from flask import current_app as app
+
+from gn_libs.mysqldb import debug_query
+
+def genocode_by_population(
+        conn: mdb.Connection, population_id: int) -> tuple[dict, ...]:
+    """Get the allele/genotype codes."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s",
+                       (population_id,))
+        return tuple(dict(item) for item in cursor.fetchall())
+
+
+def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int:
+    """Find the total count of the genotype markers for a species."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(
+            "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s",
+            (species_id,))
+        return int(cursor.fetchone()["markers_count"])
+
+
+def genotype_markers(
+        conn: mdb.Connection,
+        species_id: int,
+        offset: int = 0,
+        limit: Optional[int] = None
+) -> tuple[dict, ...]:
+    """Retrieve markers from the database."""
+    _query = "SELECT * FROM Geno WHERE SpeciesId=%s"
+    if bool(limit) and limit > 0:# type: ignore[operator]
+        _query = _query + f" LIMIT {limit} OFFSET {offset}"
+
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(_query, (species_id,))
+        debug_query(cursor, app.logger)
+        return tuple(dict(row) for row in cursor.fetchall())
+
+
+def genotype_dataset(
+        conn: mdb.Connection,
+        species_id: int,
+        population_id: int,
+        dataset_id: Optional[int] = None
+) -> Optional[dict]:
+    """Retrieve genotype datasets from the database.
+
+    Apparently, you should only ever have one genotype dataset for a population.
+    """
+    _query = (
+        "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
+        "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
+        "ON iset.Id=gf.InbredSetId "
+        "WHERE s.Id=%s AND iset.Id=%s")
+    _params = (species_id, population_id)
+    if bool(dataset_id):
+        _query = _query + " AND gf.Id=%s"
+        _params = _params + (dataset_id,)# type: ignore[assignment]
+
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(_query, _params)
+        debug_query(cursor, app.logger)
+        result = cursor.fetchone()
+        if bool(result):
+            return dict(result)
+        return None
+
+
+def save_new_dataset(
+        cursor: Cursor,
+        population_id: int,
+        name: str,
+        fullname: str,
+        shortname: str
+) -> dict:
+    """Save a new genotype dataset into the database."""
+    params = {
+        "InbredSetId": population_id,
+        "Name": name,
+        "FullName": fullname,
+        "ShortName": shortname,
+        "CreateTime": datetime.now().date().isoformat(),
+        "public": 2,
+        "confidentiality": 0,
+        "AuthorisedUsers": None
+    }
+    cursor.execute(
+        "INSERT INTO GenoFreeze("
+        "Name, FullName, ShortName, CreateTime, public, InbredSetId, "
+        "confidentiality, AuthorisedUsers"
+        ") VALUES ("
+        "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
+        "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
+        ")",
+        params)
+    return {**params, "Id": cursor.lastrowid}
diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py
new file mode 100644
index 0000000..54c2444
--- /dev/null
+++ b/uploader/genotypes/views.py
@@ -0,0 +1,207 @@
+"""Views for the genotypes."""
+from MySQLdb.cursors import DictCursor
+from gn_libs.mysqldb import database_connection
+from flask import (flash,
+                   request,
+                   url_for,
+                   redirect,
+                   Blueprint,
+                   render_template,
+                   current_app as app)
+
+from uploader.ui import make_template_renderer
+from uploader.oauth2.client import oauth2_post
+from uploader.authorisation import require_login
+from uploader.route_utils import generic_select_population
+from uploader.datautils import safe_int, enumerate_sequence
+from uploader.species.models import all_species, species_by_id
+from uploader.monadic_requests import make_either_error_handler
+from uploader.request_checks import with_species, with_population
+from uploader.population.models import population_by_species_and_id
+
+from .models import (genotype_markers,
+                     genotype_dataset,
+                     save_new_dataset,
+                     genotype_markers_count,
+                     genocode_by_population)
+
+genotypesbp = Blueprint("genotypes", __name__)
+render_template = make_template_renderer("genotypes")
+
+@genotypesbp.route("populations/genotypes", methods=["GET"])
+@require_login
+def index():
+    """Direct entry-point for genotypes."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        if not bool(request.args.get("species_id")):
+            return render_template("genotypes/index.html",
+                                   species=all_species(conn),
+                                   activelink="genotypes")
+
+        species_id = request.args.get("species_id")
+        if species_id == "CREATE-SPECIES":
+            return redirect(url_for(
+                "species.create_species",
+                return_to="species.populations.genotypes.select_population"))
+
+        species = species_by_id(conn, request.args.get("species_id"))
+        if not bool(species):
+            flash(f"Could not find species with ID '{request.args.get('species_id')}'!",
+                  "alert-danger")
+            return redirect(url_for("species.populations.genotypes.index"))
+        return redirect(url_for("species.populations.genotypes.select_population",
+                                species_id=species["SpeciesId"]))
+
+
+@genotypesbp.route("/<int:species_id>/populations/genotypes/select-population",
+                   methods=["GET"])
+@require_login
+@with_species(redirect_uri="species.populations.genotypes.index")
+def select_population(species: dict, species_id: int):# pylint: disable=[unused-argument]
+    """Select the population under which the genotypes go."""
+    return generic_select_population(
+        species,
+        "genotypes/select-population.html",
+        request.args.get("population_id") or "",
+        "species.populations.genotypes.select_population",
+        "species.populations.genotypes.list_genotypes",
+        "genotypes",
+        "Invalid population selected!")
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes",
+    methods=["GET"])
+@require_login
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+                 redirect_uri="species.populations.genotypes.select_population")
+def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+    """List genotype details for species and population."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        return render_template("genotypes/list-genotypes.html",
+                               species=species,
+                               population=population,
+                               genocode=genocode_by_population(
+                                   conn, population["Id"]),
+                               total_markers=genotype_markers_count(
+                                   conn, species["SpeciesId"]),
+                               dataset=genotype_dataset(conn,
+                                                        species["SpeciesId"],
+                                                        population["Id"]),
+                               activelink="list-genotypes")
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes/list-markers",
+    methods=["GET"])
+@require_login
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+                 redirect_uri="species.populations.genotypes.select_population")
+def list_markers(
+        species: dict,
+        population: dict,
+        **kwargs
+):# pylint: disable=[unused-argument]
+    """List a species' genetic markers."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        start_from = max(safe_int(request.args.get("start_from") or 0), 0)
+        count = safe_int(request.args.get("count") or 20)
+        return render_template("genotypes/list-markers.html",
+                               species=species,
+                               population=population,
+                               total_markers=genotype_markers_count(
+                                   conn, species["SpeciesId"]),
+                               start_from=start_from,
+                               count=count,
+                               markers=enumerate_sequence(
+                                   genotype_markers(conn,
+                                                    species["SpeciesId"],
+                                                    offset=start_from,
+                                                    limit=count),
+                                   start=start_from+1),
+                               activelink="list-markers")
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+    "<int:dataset_id>/view",
+    methods=["GET"])
+@require_login
+def view_dataset(species_id: int, population_id: int, dataset_id: int):
+    """View details regarding a specific dataset."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        species = species_by_id(conn, species_id)
+        if not bool(species):
+            flash("Invalid species provided!", "alert-danger")
+            return redirect(url_for("species.populations.genotypes.index"))
+
+        population = population_by_species_and_id(
+            conn, species_id, population_id)
+        if not bool(population):
+            flash("Invalid population selected!", "alert-danger")
+            return redirect(url_for(
+                "species.populations.genotypes.select_population",
+                species_id=species_id))
+
+        dataset = genotype_dataset(conn, species_id, population_id, dataset_id)
+        if not bool(dataset):
+            flash("Could not find such a dataset!", "alert-danger")
+            return redirect(url_for(
+                "species.populations.genotypes.list_genotypes",
+                species_id=species_id,
+                population_id=population_id))
+
+        return render_template("genotypes/view-dataset.html",
+                               species=species,
+                               population=population,
+                               dataset=dataset,
+                               activelink="view-dataset")
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+    "create",
+    methods=["GET", "POST"])
+@require_login
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+                 redirect_uri="species.populations.genotypes.select_population")
+def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+    """Create a genotype dataset."""
+    with (database_connection(app.config["SQL_URI"]) as conn,
+          conn.cursor(cursorclass=DictCursor) as cursor):
+        if request.method == "GET":
+            return render_template("genotypes/create-dataset.html",
+                                   species=species,
+                                   population=population,
+                                   activelink="create-dataset")
+
+        form = request.form
+        new_dataset = save_new_dataset(
+            cursor,
+            population["Id"],
+            form["geno-dataset-name"],
+            form["geno-dataset-fullname"],
+            form["geno-dataset-shortname"])
+
+        def __success__(_success):
+            flash("Successfully created genotype dataset.", "alert-success")
+            return redirect(url_for(
+                "species.populations.genotypes.list_genotypes",
+                species_id=species["SpeciesId"],
+                population_id=population["Id"]))
+
+        return oauth2_post(
+            "auth/resource/genotypes/create",
+            json={
+                **dict(request.form),
+                "species_id": species["SpeciesId"],
+                "population_id": population["Id"],
+                "dataset_id": new_dataset["Id"],
+                "dataset_name": form["geno-dataset-name"],
+                "dataset_fullname": form["geno-dataset-fullname"],
+                "dataset_shortname": form["geno-dataset-shortname"],
+                "public": "on"
+            }
+        ).either(
+            make_either_error_handler(
+                "There was an error creating the genotype dataset."),
+            __success__)