aboutsummaryrefslogtreecommitdiff
path: root/uploader/species
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/species')
-rw-r--r--uploader/species/__init__.py2
-rw-r--r--uploader/species/models.py152
-rw-r--r--uploader/species/views.py200
3 files changed, 354 insertions, 0 deletions
diff --git a/uploader/species/__init__.py b/uploader/species/__init__.py
new file mode 100644
index 0000000..83f2165
--- /dev/null
+++ b/uploader/species/__init__.py
@@ -0,0 +1,2 @@
+"""Package to handle creation and management of species."""
+from .views import speciesbp
diff --git a/uploader/species/models.py b/uploader/species/models.py
new file mode 100644
index 0000000..51f941c
--- /dev/null
+++ b/uploader/species/models.py
@@ -0,0 +1,152 @@
+"""Database functions for species."""
+import math
+from typing import Optional
+from functools import reduce
+
+import MySQLdb as mdb
+from MySQLdb.cursors import DictCursor
+
+def all_species(conn: mdb.Connection) -> tuple:
+ "Retrieve the species from the database."
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT Id AS SpeciesId, SpeciesName, LOWER(Name) AS Name, "
+ "MenuName, FullName, TaxonomyId, Family, FamilyOrderId, OrderId "
+ "FROM Species ORDER BY FamilyOrderId ASC, OrderID ASC")
+ return tuple(cursor.fetchall())
+
+ return tuple()
+
+def order_species_by_family(species: tuple[dict, ...]) -> list:
+ """Order the species by their family"""
+ def __family_order_id__(item):
+ orderid = item["FamilyOrderId"]
+ return math.inf if orderid is None else orderid
+ def __order__(ordered, current):
+ _key = (__family_order_id__(current), current["Family"])
+ return {
+ **ordered,
+ _key: ordered.get(_key, tuple()) + (current,)
+ }
+ ordered = reduce(__order__, species, {})# type: ignore[var-annotated]
+ return sorted(tuple(ordered.items()), key=lambda item: item[0][0])
+
+
+def species_by_id(conn: mdb.Connection, speciesid) -> dict:
+ "Retrieve the species from the database by id."
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT Id AS SpeciesId, SpeciesName, LOWER(Name) AS Name, "
+ "MenuName, FullName, TaxonomyId, Family, FamilyOrderId, OrderId "
+ "FROM Species WHERE SpeciesId=%s",
+ (speciesid,))
+ return cursor.fetchone()
+
+
+def save_species(conn: mdb.Connection,
+ common_name: str,
+ scientific_name: str,
+ family: str,
+ taxon_id: Optional[str] = None) -> dict:
+ """
+ Save a new species to the database.
+
+ Parameters
+ ----------
+ conn: A connection to the MariaDB database.
+ taxon_id: The taxonomy identifier for the new species.
+ common_name: The species' common name.
+ scientific_name; The species' scientific name.
+ """
+ genus, species_name = scientific_name.split(" ")
+ families = species_families(conn)
+ with conn.cursor() as cursor:
+ cursor.execute("SELECT MAX(OrderId) FROM Species")
+ species = {
+ "common_name": common_name,
+ "common_name_lower": common_name.lower(),
+ "menu_name": f"{common_name} ({genus[0]}. {species_name.lower()})",
+ "scientific_name": scientific_name,
+ "family": family,
+ "family_order": families[family],
+ "taxon_id": taxon_id,
+ "species_order": cursor.fetchone()[0] + 5
+ }
+ cursor.execute(
+ "INSERT INTO Species("
+ "SpeciesName, Name, MenuName, FullName, Family, FamilyOrderId, "
+ "TaxonomyId, OrderId"
+ ") VALUES ("
+ "%(common_name)s, %(common_name_lower)s, %(menu_name)s, "
+ "%(scientific_name)s, %(family)s, %(family_order)s, %(taxon_id)s, "
+ "%(species_order)s"
+ ")",
+ species)
+ species_id = cursor.lastrowid
+ cursor.execute("UPDATE Species SET SpeciesId=%s WHERE Id=%s",
+ (species_id, species_id))
+ return {
+ **species,
+ "species_id": species_id
+ }
+
+
+def update_species(# pylint: disable=[too-many-arguments]
+ conn: mdb.Connection,
+ species_id: int,
+ common_name: str,
+ scientific_name: str,
+ family: str,
+ family_order: int,
+ species_order: int
+):
+ """Update a species' details.
+
+ Parameters
+ ----------
+ conn: A connection to the MariaDB database.
+ species_id: The species identifier
+
+ Key-Word Arguments
+ ------------------
+ common_name: A layman's name for the species
+ scientific_name: A binomial nomenclature name for the species
+ family: The grouping under which the species falls
+ family_order: The ordering for the "family" above
+ species_order: The ordering of this species in relation to others
+ """
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ genus, species_name = scientific_name.split(" ")
+ species = {
+ "species_id": species_id,
+ "common_name": common_name,
+ "common_name_lower": common_name.lower(),
+ "menu_name": f"{common_name} ({genus[0]}. {species_name.lower()})",
+ "scientific_name": scientific_name,
+ "family": family,
+ "family_order": family_order,
+ "species_order": species_order
+ }
+ cursor.execute(
+ "UPDATE Species SET "
+ "SpeciesName=%(common_name)s, "
+ "Name=%(common_name_lower)s, "
+ "MenuName=%(menu_name)s, "
+ "FullName=%(scientific_name)s, "
+ "Family=%(family)s, "
+ "FamilyOrderId=%(family_order)s, "
+ "OrderId=%(species_order)s "
+ "WHERE Id=%(species_id)s",
+ species)
+
+
+def species_families(conn: mdb.Connection) -> dict:
+ """Retrieve the families under which species are grouped."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT DISTINCT(Family), FamilyOrderId FROM Species "
+ "WHERE Family IS NOT NULL")
+ return {
+ fam["Family"]: fam["FamilyOrderId"]
+ for fam in cursor.fetchall()
+ }
diff --git a/uploader/species/views.py b/uploader/species/views.py
new file mode 100644
index 0000000..10715a5
--- /dev/null
+++ b/uploader/species/views.py
@@ -0,0 +1,200 @@
+"""Endpoints handling species."""
+from pymonad.either import Left, Right, Either
+from flask import (flash,
+ request,
+ url_for,
+ redirect,
+ Blueprint,
+ current_app as app)
+
+from uploader.population import popbp
+from uploader.platforms import platformsbp
+from uploader.ui import make_template_renderer
+from uploader.db_utils import database_connection
+from uploader.oauth2.client import oauth2_get, oauth2_post
+from uploader.authorisation import require_login, require_token
+from uploader.datautils import order_by_family, enumerate_sequence
+
+from .models import (all_species,
+ save_species,
+ species_by_id,
+ update_species,
+ species_families)
+
+
+speciesbp = Blueprint("species", __name__)
+speciesbp.register_blueprint(popbp, url_prefix="/")
+speciesbp.register_blueprint(platformsbp, url_prefix="/")
+render_template = make_template_renderer("species")
+
+
+@speciesbp.route("/", methods=["GET"])
+@require_login
+def list_species():
+ """List and display all the species in the database."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ return render_template("species/list-species.html",
+ allspecies=enumerate_sequence(all_species(conn)))
+
+@speciesbp.route("/<int:species_id>", methods=["GET"])
+@require_login
+def view_species(species_id: int):
+ """View details of a particular species and menus to act upon it."""
+ with database_connection(app.config["SQL_URI"]) as conn:
+ species = species_by_id(conn, species_id)
+ if bool(species):
+ return render_template("species/view-species.html",
+ species=species,
+ activelink="view-species")
+ flash("Could not find a species with the given identifier.",
+ "alert-danger")
+ return redirect(url_for("species.view_species"))
+
+@speciesbp.route("/create", methods=["GET", "POST"])
+@require_login
+def create_species():
+ """Create a new species."""
+ # We can use uniprot's API to fetch the details with something like
+ # https://rest.uniprot.org/taxonomy/<taxonID> e.g.
+ # https://rest.uniprot.org/taxonomy/6239
+ with (database_connection(app.config["SQL_URI"]) as conn,
+ conn.cursor() as cursor):
+ if request.method == "GET":
+ return render_template("species/create-species.html",
+ families=species_families(conn),
+ activelink="create-species")
+
+ error = False
+ taxon_id = request.form.get("species_taxonomy_id", "").strip() or None
+
+ common_name = request.form.get("common_name", "").strip()
+ if not bool(common_name):
+ flash("The common species name MUST be provided.", "alert-danger")
+ error = True
+
+ scientific_name = request.form.get("scientific_name", "").strip()
+ if not bool(scientific_name):
+ flash("The species' scientific name MUST be provided.",
+ "alert-danger")
+ error = True
+
+ parts = tuple(name.strip() for name in scientific_name.split(" "))
+ if len(parts) != 2 or not all(bool(name) for name in parts):
+ flash("The scientific name you provided is invalid.", "alert-danger")
+ error = True
+
+ cursor.execute(
+ "SELECT * FROM Species WHERE FullName=%s", (scientific_name,))
+ res = cursor.fetchone()
+ if bool(res):
+ flash("A species already exists with the provided scientific name.",
+ "alert-danger")
+ error = True
+
+ family = request.form.get("species_family", "").strip()
+ if not bool(family):
+ flash("The species' family MUST be selected.", "alert-danger")
+ error = True
+
+ if bool(taxon_id):
+ cursor.execute(
+ "SELECT * FROM Species WHERE TaxonomyId=%s", (taxon_id,))
+ res = cursor.fetchone()
+ if bool(res):
+ flash("A species already exists with the provided scientific name.",
+ "alert-danger")
+ error = True
+
+ if error:
+ return redirect(url_for("species.create_species",
+ common_name=common_name,
+ scientific_name=scientific_name,
+ taxon_id=taxon_id))
+
+ species = save_species(
+ conn, common_name, scientific_name, family, taxon_id)
+ flash("Species saved successfully!", "alert-success")
+ return redirect(url_for("species.view_species", species_id=species["species_id"]))
+
+
+@speciesbp.route("/<int:species_id>/edit-extra", methods=["GET", "POST"])
+@require_login
+@require_token
+#def edit_species(species_id: int):
+def edit_species_extra(token: dict, species_id: int):# pylint: disable=[unused-argument]
+ """Edit a species' details.
+
+ Parameters
+ ----------
+ token: A JWT token used for authorisation.
+ species_id: An identifier for the species being edited.
+ """
+ def __failure__(res):
+ app.logger.debug(
+ "There was an error in the attempt to edit the species: %s", res)
+ flash(res, "alert-danger")
+ return redirect(url_for("species.view_species", species_id=species_id))
+
+ def __system_resource_uuid__(resources) -> Either:
+ sys_res = [
+ resource for resource in resources
+ if resource["resource_category"]["resource_category_key"] == "system"
+ ]
+ if len(sys_res) != 1:
+ return Left("Could not find/identify a valid system resource.")
+ return Right(sys_res[0]["resource_id"])
+
+ def __check_privileges__(authorisations):
+ if len(authorisations.items()) != 1:
+ return Left("Got authorisations for more than a single resource!")
+
+ auths = tuple(authorisations.items())[0][1]
+ authorised = "system:species:edit-extra-info" in tuple(
+ privilege["privilege_id"]
+ for role in auths["roles"]
+ for privilege in role["privileges"])
+ if authorised:
+ return Right(authorised)
+ return Left("You are not authorised to edit species extra details.")
+
+ with database_connection(app.config["SQL_URI"]) as conn:
+ species = species_by_id(conn, species_id)
+ all_the_species = all_species(conn)
+ families = species_families(conn)
+ family_order = tuple(
+ item[0] for item in order_by_family(all_the_species)
+ if item[0][1] is not None)
+ if bool(species) and request.method == "GET":
+ return oauth2_get("auth/user/resources").then(
+ __system_resource_uuid__
+ ).then(
+ lambda resource_id: oauth2_post(
+ "auth/resource/authorisation",
+ json={"resource-ids": [resource_id]})
+ ).then(__check_privileges__).then(
+ lambda authorisations: render_template(
+ "species/edit-species.html",
+ species=species,
+ families=families,
+ family_order=family_order,
+ max_order_id = max(
+ row["OrderId"] for row in all_the_species
+ if row["OrderId"] is not None),
+ activelink="edit-species")
+ ).either(__failure__, lambda res: res)
+
+ if bool(species) and request.method == "POST":
+ update_species(conn,
+ species_id,
+ request.form["species_name"],
+ request.form["species_fullname"],
+ request.form["species_family"],
+ int(request.form["species_familyorderid"]),
+ int(request.form["species_orderid"]))
+ flash("Updated species successfully.", "alert-success")
+ return redirect(url_for("species.edit_species_extra",
+ species_id=species_id))
+
+ flash("Species with the given identifier was not found!",
+ "alert-danger")
+ return redirect(url_for("species.list_species"))