aboutsummaryrefslogtreecommitdiff
path: root/uploader/genotypes
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/genotypes')
-rw-r--r--uploader/genotypes/models.py70
-rw-r--r--uploader/genotypes/views.py146
2 files changed, 162 insertions, 54 deletions
diff --git a/uploader/genotypes/models.py b/uploader/genotypes/models.py
index 642caa6..44c98b1 100644
--- a/uploader/genotypes/models.py
+++ b/uploader/genotypes/models.py
@@ -1,8 +1,9 @@
"""Functions for handling genotypes."""
from typing import Optional
+from datetime import datetime
import MySQLdb as mdb
-from MySQLdb.cursors import DictCursor
+from MySQLdb.cursors import Cursor, DictCursor
from uploader.db_utils import debug_query
@@ -32,7 +33,7 @@ def genotype_markers(
) -> tuple[dict, ...]:
"""Retrieve markers from the database."""
_query = "SELECT * FROM Geno WHERE SpeciesId=%s"
- if bool(limit) and limit > 0:
+ if bool(limit) and limit > 0:# type: ignore[operator]
_query = _query + f" LIMIT {limit} OFFSET {offset}"
with conn.cursor(cursorclass=DictCursor) as cursor:
@@ -41,17 +42,60 @@ def genotype_markers(
return tuple(dict(row) for row in cursor.fetchall())
-def genotype_datasets(
+def genotype_dataset(
conn: mdb.Connection,
species_id: int,
- population_id: int
-) -> tuple[dict, ...]:
- """Retrieve genotype datasets from the database."""
+ population_id: int,
+ dataset_id: Optional[int] = None
+) -> Optional[dict]:
+ """Retrieve genotype datasets from the database.
+
+ Apparently, you should only ever have one genotype dataset for a population.
+ """
+ _query = (
+ "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
+ "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
+ "ON iset.Id=gf.InbredSetId "
+ "WHERE s.Id=%s AND iset.Id=%s")
+ _params = (species_id, population_id)
+ if bool(dataset_id):
+ _query = _query + " AND gf.Id=%s"
+ _params = _params + (dataset_id,)# type: ignore[assignment]
+
with conn.cursor(cursorclass=DictCursor) as cursor:
- cursor.execute(
- "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
- "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
- "ON iset.Id=gf.InbredSetId "
- "WHERE s.Id=%s AND iset.Id=%s",
- (species_id, population_id))
- return tuple(dict(row) for row in cursor.fetchall())
+ cursor.execute(_query, _params)
+ debug_query(cursor)
+ result = cursor.fetchone()
+ if bool(result):
+ return dict(result)
+ return None
+
+
+def save_new_dataset(
+ cursor: Cursor,
+ population_id: int,
+ name: str,
+ fullname: str,
+ shortname: str
+) -> dict:
+ """Save a new genotype dataset into the database."""
+ params = {
+ "InbredSetId": population_id,
+ "Name": name,
+ "FullName": fullname,
+ "ShortName": shortname,
+ "CreateTime": datetime.now().date().isoformat(),
+ "public": 2,
+ "confidentiality": 0,
+ "AuthorisedUsers": None
+ }
+ cursor.execute(
+ "INSERT INTO GenoFreeze("
+ "Name, FullName, ShortName, CreateTime, public, InbredSetId, "
+ "confidentiality, AuthorisedUsers"
+ ") VALUES ("
+ "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
+ "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
+ ")",
+ params)
+ return {**params, "Id": cursor.lastrowid}
diff --git a/uploader/genotypes/views.py b/uploader/genotypes/views.py
index 294da0e..0821eca 100644
--- a/uploader/genotypes/views.py
+++ b/uploader/genotypes/views.py
@@ -1,4 +1,5 @@
"""Views for the genotypes."""
+from MySQLdb.cursors import DictCursor
from flask import (flash,
request,
url_for,
@@ -7,19 +8,25 @@ from flask import (flash,
render_template,
current_app as app)
+from uploader.ui import make_template_renderer
+from uploader.oauth2.client import oauth2_post
from uploader.authorisation import require_login
from uploader.db_utils import database_connection
from uploader.species.models import all_species, species_by_id
+from uploader.monadic_requests import make_either_error_handler
+from uploader.request_checks import with_species, with_population
from uploader.datautils import safe_int, order_by_family, enumerate_sequence
from uploader.population.models import (populations_by_species,
population_by_species_and_id)
from .models import (genotype_markers,
- genotype_datasets,
+ genotype_dataset,
+ save_new_dataset,
genotype_markers_count,
genocode_by_population)
genotypesbp = Blueprint("genotypes", __name__)
+render_template = make_template_renderer("genotypes")
@genotypesbp.route("populations/genotypes", methods=["GET"])
@require_login
@@ -42,14 +49,10 @@ def index():
@genotypesbp.route("/<int:species_id>/populations/genotypes/select-population",
methods=["GET"])
@require_login
-def select_population(species_id: int):
+@with_species(redirect_uri="species.populations.genotypes.index")
+def select_population(species: dict, species_id: int):
"""Select the population under which the genotypes go."""
with database_connection(app.config["SQL_URI"]) as conn:
- species = species_by_id(conn, species_id)
- if not bool(species):
- flash("Invalid species provided!", "alert-danger")
- return redirect(url_for("species.populations.genotypes.index"))
-
if not bool(request.args.get("population_id")):
return render_template("genotypes/select-population.html",
species=species,
@@ -75,58 +78,44 @@ def select_population(species_id: int):
"/<int:species_id>/populations/<int:population_id>/genotypes",
methods=["GET"])
@require_login
-def list_genotypes(species_id: int, population_id: int):
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+ redirect_uri="species.populations.genotypes.select_population")
+def list_genotypes(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
"""List genotype details for species and population."""
with database_connection(app.config["SQL_URI"]) as conn:
- species = species_by_id(conn, species_id)
- if not bool(species):
- flash("Invalid species provided!", "alert-danger")
- return redirect(url_for("species.populations.genotypes.index"))
-
- population = population_by_species_and_id(
- conn, species_id, population_id)
- if not bool(population):
- flash("Invalid population selected!", "alert-danger")
- return redirect(url_for(
- "species.populations.genotypes.select_population",
- species_id=species_id))
-
return render_template("genotypes/list-genotypes.html",
species=species,
population=population,
genocode=genocode_by_population(
- conn, population_id),
+ conn, population["Id"]),
total_markers=genotype_markers_count(
- conn, species_id),
- datasets=genotype_datasets(
- conn, species_id, population_id),
+ conn, species["SpeciesId"]),
+ dataset=genotype_dataset(conn,
+ species["SpeciesId"],
+ population["Id"]),
activelink="list-genotypes")
@genotypesbp.route("/<int:species_id>/genotypes/list-markers", methods=["GET"])
@require_login
-def list_markers(species_id: int):
+@with_species(redirect_uri="species.populations.genotypes.index")
+def list_markers(species: dict, **kwargs):# pylint: disable=[unused-argument]
"""List a species' genetic markers."""
with database_connection(app.config["SQL_URI"]) as conn:
- species = species_by_id(conn, species_id)
- if not bool(species):
- flash("Invalid species provided!", "alert-danger")
- return redirect(url_for("species.populations.genotypes.index"))
-
- start_from = safe_int(request.args.get("start_from") or 0)
- if start_from < 0:
- start_from = 0
+ start_from = max(safe_int(request.args.get("start_from") or 0), 0)
count = safe_int(request.args.get("count") or 20)
- markers = enumerate_sequence(
- genotype_markers(conn, species_id, offset=start_from, limit=count),
- start=start_from+1)
return render_template("genotypes/list-markers.html",
species=species,
total_markers=genotype_markers_count(
- conn, species_id),
+ conn, species["SpeciesId"]),
start_from=start_from,
count=count,
- markers=markers,
+ markers=enumerate_sequence(
+ genotype_markers(conn,
+ species["SpeciesId"],
+ offset=start_from,
+ limit=count),
+ start=start_from+1),
activelink="list-markers")
@genotypesbp.route(
@@ -136,5 +125,80 @@ def list_markers(species_id: int):
@require_login
def view_dataset(species_id: int, population_id: int, dataset_id: int):
"""View details regarding a specific dataset."""
- return (f"Genotype dataset '{dataset_id}, from population '{population_id}' "
- f"of species '{species_id}'.")
+ with database_connection(app.config["SQL_URI"]) as conn:
+ species = species_by_id(conn, species_id)
+ if not bool(species):
+ flash("Invalid species provided!", "alert-danger")
+ return redirect(url_for("species.populations.genotypes.index"))
+
+ population = population_by_species_and_id(
+ conn, species_id, population_id)
+ if not bool(population):
+ flash("Invalid population selected!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.genotypes.select_population",
+ species_id=species_id))
+
+ dataset = genotype_dataset(conn, species_id, population_id, dataset_id)
+ if not bool(dataset):
+ flash("Could not find such a dataset!", "alert-danger")
+ return redirect(url_for(
+ "species.populations.genotypes.list_genotypes",
+ species_id=species_id,
+ population_id=population_id))
+
+ return render_template("genotypes/view-dataset.html",
+ species=species,
+ population=population,
+ dataset=dataset,
+ activelink="view-dataset")
+
+
+@genotypesbp.route(
+ "/<int:species_id>/populations/<int:population_id>/genotypes/datasets/"
+ "create",
+ methods=["GET", "POST"])
+@require_login
+@with_population(species_redirect_uri="species.populations.genotypes.index",
+ redirect_uri="species.populations.genotypes.select_population")
+def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
+ """Create a genotype dataset."""
+ with (database_connection(app.config["SQL_URI"]) as conn,
+ conn.cursor(cursorclass=DictCursor) as cursor):
+ if request.method == "GET":
+ return render_template("genotypes/create-dataset.html",
+ species=species,
+ population=population,
+ activelink="create-dataset")
+
+ form = request.form
+ new_dataset = save_new_dataset(
+ cursor,
+ population["Id"],
+ form["geno-dataset-name"],
+ form["geno-dataset-fullname"],
+ form["geno-dataset-shortname"])
+
+ def __success__(_success):
+ flash("Successfully created genotype dataset.", "alert-success")
+ return redirect(url_for(
+ "species.populations.genotypes.list_genotypes",
+ species_id=species["SpeciesId"],
+ population_id=population["Id"]))
+
+ return oauth2_post(
+ "auth/resource/genotypes/create",
+ json={
+ **dict(request.form),
+ "species_id": species["SpeciesId"],
+ "population_id": population["Id"],
+ "dataset_id": new_dataset["Id"],
+ "dataset_name": form["geno-dataset-name"],
+ "dataset_fullname": form["geno-dataset-fullname"],
+ "dataset_shortname": form["geno-dataset-shortname"],
+ "public": "on"
+ }
+ ).either(
+ make_either_error_handler(
+ "There was an error creating the genotype dataset."),
+ __success__)