about summary refs log tree commit diff
path: root/uploader/genotypes
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/genotypes')
-rw-r--r--uploader/genotypes/__init__.py1
-rw-r--r--uploader/genotypes/models.py114
-rw-r--r--uploader/genotypes/views.py178
3 files changed, 293 insertions, 0 deletions
diff --git a/uploader/genotypes/__init__.py b/uploader/genotypes/__init__.py
new file mode 100644
index 0000000..d0025d6
--- /dev/null
+++ b/uploader/genotypes/__init__.py
@@ -0,0 +1 @@
+"""The Genotypes module."""
diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py
new file mode 100644
index 0000000..34d2cfe
--- /dev/null
+++ b/uploader/genotypes/models.py
@@ -0,0 +1,114 @@
+"""Functions for handling genotypes."""
+from typing import Optional
+from datetime import datetime
+
+import MySQLdb as mdb
+from MySQLdb.cursors import Cursor, DictCursor
+from flask import current_app as app
+
+from gn_libs.mysqldb import debug_query
+
+def genocode_by_population(
+        conn: mdb.Connection, population_id: int) -> tuple[dict, ...]:
+    """Get the allele/genotype codes."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s",
+                       (population_id,))
+        return tuple(dict(item) for item in cursor.fetchall())
+
+
+def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int:
+    """Find the total count of the genotype markers for a species."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(
+            "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s",
+            (species_id,))
+        return int(cursor.fetchone()["markers_count"])
+
+
+def genotype_markers(
+        conn: mdb.Connection,
+        species_id: int,
+        offset: int = 0,
+        limit: Optional[int] = None
+) -> tuple[tuple[dict, ...], int]:
+    """Retrieve markers from the database."""
+    _query_template = (
+        "SELECT %%COLS%% FROM Geno AS gno "
+        "WHERE gno.SpeciesId=%s "
+        "%%LIMIT%%")
+
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(
+            _query_template.replace("%%LIMIT%%", "").replace(
+                "%%COLS%%", "COUNT(gno.Id) AS total_records"),
+            (species_id,))
+        _total_records = cursor.fetchone()["total_records"]
+        cursor.execute(
+            _query_template.replace("%%COLS%%", "gno.*").replace(
+                "%%LIMIT%%",
+                (f"LIMIT {int(limit)} OFFSET {int(offset)}"
+                 if bool(limit) and limit > 0
+                 else "")),
+            (species_id,))
+        debug_query(cursor, app.logger)
+        return tuple(dict(row) for row in cursor.fetchall()), _total_records
+
+
+def genotype_dataset(
+        conn: mdb.Connection,
+        species_id: int,
+        population_id: int,
+        dataset_id: Optional[int] = None
+) -> Optional[dict]:
+    """Retrieve genotype datasets from the database.
+
+    Apparently, you should only ever have one genotype dataset for a population.
+    """
+    _query = (
+        "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
+        "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
+        "ON iset.Id=gf.InbredSetId "
+        "WHERE s.Id=%s AND iset.Id=%s")
+    _params = (species_id, population_id)
+    if bool(dataset_id):
+        _query = _query + " AND gf.Id=%s"
+        _params = _params + (dataset_id,)# type: ignore[assignment]
+
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(_query, _params)
+        debug_query(cursor, app.logger)
+        result = cursor.fetchone()
+        if bool(result):
+            return dict(result)
+        return None
+
+
+def save_new_dataset(
+        cursor: Cursor,
+        population_id: int,
+        name: str,
+        fullname: str,
+        shortname: str
+) -> dict:
+    """Save a new genotype dataset into the database."""
+    params = {
+        "InbredSetId": population_id,
+        "Name": name,
+        "FullName": fullname,
+        "ShortName": shortname,
+        "CreateTime": datetime.now().date().isoformat(),
+        "public": 2,
+        "confidentiality": 0,
+        "AuthorisedUsers": None
+    }
+    cursor.execute(
+        "INSERT INTO GenoFreeze("
+        "Name, FullName, ShortName, CreateTime, public, InbredSetId, "
+        "confidentiality, AuthorisedUsers"
+        ") VALUES ("
+        "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
+        "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
+        ")",
+        params)
+    return {**params, "Id": cursor.lastrowid}
diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py
new file mode 100644
index 0000000..f27671c
--- /dev/null
+++ b/uploader/genotypes/views.py
@@ -0,0 +1,178 @@
+"""Views for the genotypes."""
+import logging
+
+from MySQLdb.cursors import DictCursor
+from pymonad.either import Left, Right, Either
+from gn_libs.mysqldb import database_connection
+from flask import (flash,
+                   request,
+                   jsonify,
+                   redirect,
+                   Blueprint,
+                   render_template,
+                   current_app as app)
+
+from uploader.flask_extensions import url_for
+from uploader.ui import make_template_renderer
+from uploader.oauth2.client import oauth2_post
+from uploader.authorisation import require_login
+from uploader.route_utils import generic_select_population
+from uploader.datautils import safe_int, enumerate_sequence
+from uploader.species.models import all_species, species_by_id
+from uploader.monadic_requests import make_either_error_handler
+from uploader.population.models import population_by_species_and_id
+from uploader.request_checks import with_species, with_dataset, with_population
+
+from .models import (genotype_markers,
+                     genotype_dataset,
+                     save_new_dataset,
+                     genotype_markers_count,
+                     genocode_by_population)
+
+logger = logging.getLogger(__name__)
+genotypesbp = Blueprint("genotypes", __name__)
+render_template = make_template_renderer("genotypes")
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes",
+    methods=["GET"])
+@require_login
+@with_population(species_redirect_uri="species.list_species",
+                 redirect_uri="species.populations.list_species_populations")
+def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+    """List genotype details for species and population."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        return render_template("genotypes/list-genotypes.html",
+                               species=species,
+                               population=population,
+                               genocode=genocode_by_population(
+                                   conn, population["Id"]),
+                               total_markers=genotype_markers_count(
+                                   conn, species["SpeciesId"]),
+                               dataset=genotype_dataset(conn,
+                                                        species["SpeciesId"],
+                                                        population["Id"]),
+                               activelink="list-genotypes")
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes/<int:dataset_id>/list-markers",
+    methods=["GET"])
+@require_login
+@with_species(redirect_uri="species.populations.genotypes.list_genotypes")
+def list_markers(species: dict, **_kwargs):
+    """List the markers that exist for this species."""
+    args = request.args
+    offset = int(args.get("start") or 0)
+    with database_connection(app.config["SQL_URI"]) as conn:
+        markers, total_records = genotype_markers(
+            conn,
+            species["SpeciesId"],
+            offset=offset,
+            limit=int(args.get("length") or 0))
+        return jsonify({
+            **({"draw": int(args.get("draw"))}
+               if bool(args.get("draw") or False)
+               else {}),
+            "recordsTotal": total_records,
+            "recordsFiltered": len(markers),
+            "markers": tuple({**marker, "index": idx}
+                             for idx, marker in
+                             enumerate(markers, start=offset+1))
+        })
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+    "<int:dataset_id>/view",
+    methods=["GET"])
+@require_login
+def view_dataset(species_id: int, population_id: int, dataset_id: int):
+    """View details regarding a specific dataset."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        species = species_by_id(conn, species_id)
+        if not bool(species):
+            flash("Invalid species provided!", "alert-danger")
+            return redirect(url_for("species.list_species"))
+
+        population = population_by_species_and_id(
+            conn, species_id, population_id)
+        if not bool(population):
+            flash("Invalid population selected!", "alert-danger")
+            return redirect(url_for(
+                "species.populations.list_species_populations",
+                species_id=species_id))
+
+        dataset = genotype_dataset(conn, species_id, population_id, dataset_id)
+        if not bool(dataset):
+            flash("Could not find such a dataset!", "alert-danger")
+            return redirect(url_for(
+                "species.populations.genotypes.list_genotypes",
+                species_id=species_id,
+                population_id=population_id))
+
+        return render_template("genotypes/view-dataset.html",
+                               species=species,
+                               population=population,
+                               dataset=dataset,
+                               activelink="view-dataset")
+
+
+@genotypesbp.route(
+    "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+    "create",
+    methods=["GET", "POST"])
+@require_login
+@with_population(species_redirect_uri="species.list_species",
+                 redirect_uri="species.populations.list_species_populations")
+def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+    """Create a genotype dataset."""
+    if request.method == "GET":
+        return render_template("genotypes/create-dataset.html",
+                               species=species,
+                               population=population,
+                               activelink="create-dataset")
+
+    with (database_connection(app.config["SQL_URI"]) as conn,
+          conn.cursor(cursorclass=DictCursor) as cursor):
+
+        def __save_dataset__() -> Either:
+            form = request.form
+            try:
+                return Right(save_new_dataset(
+                    cursor,
+                    population["Id"],
+                    form["geno-dataset-name"],
+                    form["geno-dataset-fullname"],
+                    form["geno-dataset-shortname"]))
+            except Exception:
+                msg = "Error adding new Genotype dataset to database."
+                logger.error(msg, exc_info=True)
+                return Left(Exception(msg))
+
+        def __success__(_success):
+            flash("Successfully created genotype dataset.", "alert-success")
+            return redirect(url_for(
+                "species.populations.genotypes.list_genotypes",
+                species_id=species["SpeciesId"],
+                population_id=population["Id"]))
+
+        return __save_dataset__().then(
+            lambda new_dataset: oauth2_post(
+                "auth/resource/genotypes/create",
+                json={
+                    **dict(request.form),
+                    "species_id": species["SpeciesId"],
+                    "population_id": population["Id"],
+                    "dataset_id": new_dataset["Id"],
+                    "dataset_name": new_dataset["Name"],
+                    "dataset_fullname": new_dataset["FullName"],
+                    "dataset_shortname": new_dataset["ShortName"],
+                    "public": "on"
+                }
+            )
+        ).either(
+            make_either_error_handler(
+                "There was an error creating the genotype dataset."),
+            __success__)